Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "mdd_internal.h"
46
47 static const char dot[] = ".";
48 static const char dotdot[] = "..";
49
50 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
51                         const char *name, const struct lu_fid* fid, int mask);
52 static int
53 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
54                     const char *name, const struct lu_fid* fid, int mask)
55 {
56         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
57         int rc;
58
59         mdd_read_lock(env, mdd_obj);
60         rc = __mdd_lookup(env, pobj, name, fid, mask);
61         mdd_read_unlock(env, mdd_obj);
62
63        return rc;
64 }
65
66 static int mdd_lookup(const struct lu_env *env,
67                       struct md_object *pobj, const char *name,
68                       struct lu_fid* fid)
69 {
70         int rc;
71         ENTRY;
72         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
73         RETURN(rc);
74 }
75
76
77 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
78                           struct lu_fid *fid)
79 {
80         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
81 }
82
83 /*
84  * return 1: if lf is the fid of the ancestor of p1;
85  * return 0: if not;
86  *
87  * return -EREMOTE: if remote object is found, in this
88  * case fid of remote object is saved to @pf;
89  *
90  * otherwise: values < 0, errors.
91  */
92 static int mdd_is_parent(const struct lu_env *env,
93                          struct mdd_device *mdd,
94                          struct mdd_object *p1,
95                          const struct lu_fid *lf,
96                          struct lu_fid *pf)
97 {
98         struct mdd_object *parent = NULL;
99         struct lu_fid *pfid;
100         int rc;
101         ENTRY;
102
103         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
104         pfid = &mdd_env_info(env)->mti_fid;
105
106         /* Do not lookup ".." in root, they do not exist there. */
107         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
108                 RETURN(0);
109
110         for(;;) {
111                 rc = mdd_parent_fid(env, p1, pfid);
112                 if (rc)
113                         GOTO(out, rc);
114                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
115                         GOTO(out, rc = 0);
116                 if (lu_fid_eq(pfid, lf))
117                         GOTO(out, rc = 1);
118                 if (parent)
119                         mdd_object_put(env, parent);
120                 parent = mdd_object_find(env, mdd, pfid);
121
122                 /* cross-ref parent */
123                 if (parent == NULL) {
124                         if (pf != NULL)
125                                 *pf = *pfid;
126                         GOTO(out, rc = EREMOTE);
127                 } else if (IS_ERR(parent))
128                         GOTO(out, rc = PTR_ERR(parent));
129                 p1 = parent;
130         }
131         EXIT;
132 out:
133         if (parent && !IS_ERR(parent))
134                 mdd_object_put(env, parent);
135         return rc;
136 }
137
138 /*
139  * No permission check is needed.
140  *
141  * returns 1: if fid is ancestor of @mo;
142  * returns 0: if fid is not a ancestor of @mo;
143  *
144  * returns EREMOTE if remote object is found, fid of remote object is saved to
145  * @fid;
146  *
147  * returns < 0: if error
148  */
149 static int mdd_is_subdir(const struct lu_env *env,
150                          struct md_object *mo, const struct lu_fid *fid,
151                          struct lu_fid *sfid)
152 {
153         struct mdd_device *mdd = mdo2mdd(mo);
154         int rc;
155         ENTRY;
156
157         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
158                 RETURN(0);
159
160         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
161
162         RETURN(rc);
163 }
164
165 /*Check whether it may create the cobj under the pobj*/
166 static int mdd_may_create(const struct lu_env *env,
167                           struct mdd_object *pobj, struct mdd_object *cobj,
168                           int need_check)
169 {
170         int rc = 0;
171         ENTRY;
172
173         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
174                 RETURN(-EEXIST);
175
176         if (mdd_is_dead_obj(pobj))
177                 RETURN(-ENOENT);
178
179         /*check pobj may create or not*/
180         if (need_check)
181                 rc = mdd_permission_internal(env, pobj,
182                                              MAY_WRITE | MAY_EXEC);
183
184         RETURN(rc);
185 }
186
187 /*
188  * It's inline, so penalty for filesystems that don't use sticky bit is
189  * minimal.
190  */
191 static inline int mdd_is_sticky(const struct lu_env *env,
192                                 struct mdd_object *pobj,
193                                 struct mdd_object *cobj)
194 {
195         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
196         struct md_ucred *uc = md_ucred(env);
197         int rc;
198
199         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
200         if (rc) {
201                 return rc;
202         } else if (tmp_la->la_uid == uc->mu_fsuid) {
203                 return 0;
204         } else {
205                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
206                 if (rc)
207                         return rc;
208                 else if (!(tmp_la->la_mode & S_ISVTX))
209                         return 0;
210                 else if (tmp_la->la_uid == uc->mu_fsuid)
211                         return 0;
212                 else
213                         return !mdd_capable(uc, CAP_FOWNER);
214         }
215 }
216
217 /* Check whether it may delete the cobj under the pobj. */
218 static int mdd_may_delete(const struct lu_env *env,
219                           struct mdd_object *pobj,
220                           struct mdd_object *cobj,
221                           int is_dir, int need_check)
222 {
223         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
224         int rc = 0;
225         ENTRY;
226
227         LASSERT(cobj);
228
229         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
230                 RETURN(-ENOENT);
231
232         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
233                 RETURN(-EPERM);
234
235         if (is_dir) {
236                 if (!S_ISDIR(mdd_object_type(cobj)))
237                         RETURN(-ENOTDIR);
238
239                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
240                         RETURN(-EBUSY);
241
242         } else if (S_ISDIR(mdd_object_type(cobj))) {
243                         RETURN(-EISDIR);
244         }
245
246         if (pobj) {
247                 if (mdd_is_dead_obj(pobj))
248                         RETURN(-ENOENT);
249
250                 if (mdd_is_sticky(env, pobj, cobj))
251                         RETURN(-EPERM);
252
253                 if (need_check)
254                         rc = mdd_permission_internal(env, pobj,
255                                                      MAY_WRITE | MAY_EXEC);
256         }
257         RETURN(rc);
258 }
259
260 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
261                           struct mdd_object *src_obj)
262 {
263         int rc = 0;
264         ENTRY;
265
266         if (tgt_obj) {
267                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
268                 if (rc)
269                         RETURN(rc);
270         }
271
272         if (S_ISDIR(mdd_object_type(src_obj)))
273                 RETURN(-EPERM);
274
275         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
276                 RETURN(-EPERM);
277
278         RETURN(rc);
279 }
280
281 static void mdd_lock2(const struct lu_env *env,
282                       struct mdd_object *o0, struct mdd_object *o1)
283 {
284         mdd_write_lock(env, o0);
285         mdd_write_lock(env, o1);
286 }
287
288 static void mdd_unlock2(const struct lu_env *env,
289                         struct mdd_object *o0, struct mdd_object *o1)
290 {
291         mdd_write_unlock(env, o1);
292         mdd_write_unlock(env, o0);
293 }
294
295 /* insert new index, add reference if isdir, update times */
296 static int __mdd_index_insert(const struct lu_env *env,
297                              struct mdd_object *pobj, const struct lu_fid *lf,
298                              const char *name, int isdir, struct thandle *th,
299                              struct lustre_capa *capa)
300 {
301         struct dt_object *next = mdd_object_child(pobj);
302         struct timeval start;
303         int rc;
304         ENTRY;
305         
306         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start, 
307                              LPROC_MDD_INDEX_INSERT);
308 #if 0
309         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
310 #endif
311
312         if (dt_try_as_dir(env, next))
313                 rc = next->do_index_ops->dio_insert(env, next,
314                                                     (struct dt_rec *)lf,
315                                                     (struct dt_key *)name,
316                                                     th, capa);
317         else
318                 rc = -ENOTDIR;
319
320         if (rc == 0) {
321                 if (isdir)
322                         mdd_ref_add_internal(env, pobj, th);
323 #if 0
324                 la->la_valid = LA_MTIME|LA_CTIME;
325                 la->la_atime = ma->ma_attr.la_atime;
326                 la->la_ctime = ma->ma_attr.la_ctime;
327                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
328 #endif
329         }
330         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
331                            LPROC_MDD_INDEX_INSERT);
332         return rc;
333 }
334
335 static int __mdd_index_delete(const struct lu_env *env,
336                               struct mdd_object *pobj, const char *name,
337                               int is_dir, struct thandle *handle,
338                               struct lustre_capa *capa)
339 {
340         struct dt_object *next = mdd_object_child(pobj);
341         struct timeval start;
342         int rc;
343         ENTRY;
344
345         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start, 
346                              LPROC_MDD_INDEX_DELETE);
347         if (dt_try_as_dir(env, next)) {
348                 rc = next->do_index_ops->dio_delete(env, next,
349                                                     (struct dt_key *)name,
350                                                     handle, capa);
351                 if (rc == 0 && is_dir)
352                         mdd_ref_del_internal(env, pobj, handle);
353         } else
354                 rc = -ENOTDIR;
355         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start, 
356                            LPROC_MDD_INDEX_DELETE);
357         RETURN(rc);
358 }
359
360 static int __mdd_index_insert_only(const struct lu_env *env,
361                                    struct mdd_object *pobj,
362                                    const struct lu_fid *lf,
363                                    const char *name, struct thandle *th,
364                                    struct lustre_capa *capa)
365 {
366         int rc;
367         struct dt_object *next = mdd_object_child(pobj);
368         ENTRY;
369
370         if (dt_try_as_dir(env, next))
371                 rc = next->do_index_ops->dio_insert(env, next,
372                                          (struct dt_rec *)lf,
373                                          (struct dt_key *)name, th, capa);
374         else
375                 rc = -ENOTDIR;
376         RETURN(rc);
377 }
378
379 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
380                     struct md_object *src_obj, const char *name,
381                     struct md_attr *ma)
382 {
383         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
384         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
385         struct mdd_device *mdd = mdo2mdd(src_obj);
386         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
387         struct thandle *handle;
388         int rc;
389         ENTRY;
390
391         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
392         handle = mdd_trans_start(env, mdd);
393         if (IS_ERR(handle))
394                 RETURN(PTR_ERR(handle));
395
396         mdd_lock2(env, mdd_tobj, mdd_sobj);
397
398         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
399         if (rc)
400                 GOTO(out, rc);
401
402         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
403                                      name, handle,
404                                      mdd_object_capa(env, mdd_tobj));
405         if (rc == 0)
406                 mdd_ref_add_internal(env, mdd_sobj, handle);
407
408         *la_copy = ma->ma_attr;
409         la_copy->la_valid = LA_CTIME;
410         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
411         if (rc)
412                 GOTO(out, rc);
413
414         la_copy->la_valid = LA_CTIME | LA_MTIME;
415         rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
416
417 out:
418         mdd_unlock2(env, mdd_tobj, mdd_sobj);
419         mdd_trans_stop(env, mdd, rc, handle);
420         RETURN(rc);
421 }
422
423 /* caller should take a lock before calling */
424 int mdd_finish_unlink(const struct lu_env *env,
425                       struct mdd_object *obj, struct md_attr *ma,
426                       struct thandle *th)
427 {
428         int rc;
429         ENTRY;
430
431         rc = mdd_iattr_get(env, obj, ma);
432         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
433                 /* add new orphan and the object
434                  * will be deleted during the object_put() */
435                 if (__mdd_orphan_add(env, obj, th) == 0)
436                         set_bit(LU_OBJECT_ORPHAN,
437                                 &mdd2lu_obj(obj)->lo_header->loh_flags);
438
439                 if (obj->mod_count == 0)
440                         rc = mdd_object_kill(env, obj, ma);
441         }
442         RETURN(rc);
443 }
444
445 /*
446  * Check that @dir contains no entries except (possibly) dot and dotdot.
447  *
448  * Returns:
449  *
450  *             0        empty
451  *    -ENOTEMPTY        not empty
452  *           -ve        other error
453  *
454  */
455 static int mdd_dir_is_empty(const struct lu_env *env,
456                             struct mdd_object *dir)
457 {
458         struct dt_it     *it;
459         struct dt_object *obj;
460         struct dt_it_ops *iops;
461         int result;
462         ENTRY;
463
464         obj = mdd_object_child(dir);
465         iops = &obj->do_index_ops->dio_it;
466         it = iops->init(env, obj, 0);
467         if (it != NULL) {
468                 result = iops->get(env, it, (const void *)"");
469                 if (result > 0) {
470                         int i;
471                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
472                                 result = iops->next(env, it);
473                         if (result == 0)
474                                 result = -ENOTEMPTY;
475                         else if (result == +1)
476                                 result = 0;
477                 } else if (result == 0)
478                         /*
479                          * Huh? Index contains no zero key?
480                          */
481                         result = -EIO;
482
483                 iops->put(env, it);
484                 iops->fini(env, it);
485         } else
486                 result = -ENOMEM;
487         RETURN(result);
488 }
489
490 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
491                             struct mdd_object *cobj, struct md_attr *ma)
492 {
493         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
494         int rc = 0;
495         ENTRY;
496
497         rc = mdd_may_delete(env, pobj, cobj,
498                             S_ISDIR(ma->ma_attr.la_mode), 1);
499         if (rc)
500                 RETURN(rc);
501
502         if (S_ISDIR(mdd_object_type(cobj))) {
503                 if (dt_try_as_dir(env, dt_cobj))
504                         rc = mdd_dir_is_empty(env, cobj);
505                 else
506                         rc = -ENOTDIR;
507         }
508
509         RETURN(rc);
510 }
511
512 static int mdd_unlink(const struct lu_env *env,
513                       struct md_object *pobj, struct md_object *cobj,
514                       const char *name, struct md_attr *ma)
515 {
516         struct mdd_device *mdd = mdo2mdd(pobj);
517         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
518         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
519         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
520         struct thandle    *handle;
521         int rc, is_dir;
522         ENTRY;
523
524         rc = mdd_log_txn_param_build(env, mdd_cobj, ma, MDD_TXN_UNLINK_OP);
525         if (rc)
526                 RETURN(rc);
527         
528         handle = mdd_trans_start(env, mdd);
529         if (IS_ERR(handle))
530                 RETURN(PTR_ERR(handle));
531
532         mdd_lock2(env, mdd_pobj, mdd_cobj);
533
534         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
535         if (rc)
536                 GOTO(cleanup, rc);
537
538         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
539         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
540                                 mdd_object_capa(env, mdd_pobj));
541         if (rc)
542                 GOTO(cleanup, rc);
543
544         mdd_ref_del_internal(env, mdd_cobj, handle);
545         *la_copy = ma->ma_attr;
546         if (is_dir) {
547                 /* unlink dot */
548                 mdd_ref_del_internal(env, mdd_cobj, handle);
549         } else {
550                 la_copy->la_valid = LA_CTIME;
551                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
552                 if (rc)
553                         GOTO(cleanup, rc);
554         }
555
556         la_copy->la_valid = LA_CTIME | LA_MTIME;
557         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
558         if (rc)
559                 GOTO(cleanup, rc);
560
561         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
562
563         if (rc == 0)
564                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
565                                    strlen("unlinked"), "unlinked", 0,
566                                    NULL, NULL);
567 cleanup:
568         mdd_unlock2(env, mdd_pobj, mdd_cobj);
569         mdd_trans_stop(env, mdd, rc, handle);
570         RETURN(rc);
571 }
572
573 /*
574  * Partial operation. Be aware, this is called with write lock taken, so we use
575  * locksless version of __mdd_lookup() here.
576  */
577 static int mdd_ni_sanity_check(const struct lu_env *env,
578                                struct md_object *pobj,
579                                const char *name,
580                                const struct lu_fid *fid)
581 {
582         struct mdd_object *obj       = md2mdd_obj(pobj);
583 #if 0
584         int rc;
585 #endif
586         ENTRY;
587
588         /* EEXIST check */
589         if (mdd_is_dead_obj(obj))
590                 RETURN(-ENOENT);
591
592          /* The exist of the name will be checked in _index_insert. */
593 #if 0
594         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
595         if (rc != -ENOENT)
596                 RETURN(rc ? : -EEXIST);
597         else
598                 RETURN(0);
599 #endif
600         RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
601 }
602
603 static int mdd_name_insert(const struct lu_env *env,
604                            struct md_object *pobj,
605                            const char *name, const struct lu_fid *fid,
606                            int isdir)
607 {
608         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
609         struct mdd_device *mdd = mdo2mdd(pobj);
610         struct thandle *handle;
611         int rc;
612         ENTRY;
613
614         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
615         handle = mdd_trans_start(env, mdo2mdd(pobj));
616         if (IS_ERR(handle))
617                 RETURN(PTR_ERR(handle));
618
619         mdd_write_lock(env, mdd_obj);
620         rc = mdd_ni_sanity_check(env, pobj, name, fid);
621         if (rc)
622                 GOTO(out_unlock, rc);
623
624         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
625                                 BYPASS_CAPA);
626
627 out_unlock:
628         mdd_write_unlock(env, mdd_obj);
629
630         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
631         RETURN(rc);
632 }
633
634 /*
635  * Be aware, this is called with write lock taken, so we use locksless version
636  * of __mdd_lookup() here.
637  */
638 static int mdd_nr_sanity_check(const struct lu_env *env,
639                                struct md_object *pobj,
640                                const char *name)
641 {
642         struct mdd_object *obj       = md2mdd_obj(pobj);
643 #if 0
644         struct mdd_thread_info *info = mdd_env_info(env);
645         struct lu_fid     *fid       = &info->mti_fid;
646         int rc;
647 #endif
648         ENTRY;
649
650         /* EEXIST check */
651         if (mdd_is_dead_obj(obj))
652                 RETURN(-ENOENT);
653
654          /* The exist of the name will be checked in _index_delete. */
655 #if 0
656         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
657         RETURN(rc);
658 #endif
659         RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
660 }
661
662 static int mdd_name_remove(const struct lu_env *env,
663                            struct md_object *pobj,
664                            const char *name, int is_dir)
665 {
666         struct mdd_device *mdd = mdo2mdd(pobj);
667         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
668         struct thandle *handle;
669         int rc;
670         ENTRY;
671
672         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
673         handle = mdd_trans_start(env, mdd);
674         if (IS_ERR(handle))
675                 RETURN(PTR_ERR(handle));
676
677         mdd_write_lock(env, mdd_obj);
678         rc = mdd_nr_sanity_check(env, pobj, name);
679         if (rc)
680                 GOTO(out_unlock, rc);
681
682         rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
683                                 BYPASS_CAPA);
684
685 out_unlock:
686         mdd_write_unlock(env, mdd_obj);
687
688         mdd_trans_stop(env, mdd, rc, handle);
689         RETURN(rc);
690 }
691 static int mdd_rt_sanity_check(const struct lu_env *env,
692                                struct mdd_object *tgt_pobj,
693                                struct mdd_object *tobj,
694                                const struct lu_fid *sfid,
695                                const char *name, struct md_attr *ma)
696 {
697         int rc, src_is_dir;
698         ENTRY;
699
700         /* EEXIST check */
701         if (mdd_is_dead_obj(tgt_pobj))
702                 RETURN(-ENOENT);
703
704         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
705         if (tobj) {
706                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
707                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
708                      mdd_dir_is_empty(env, tobj))
709                                 RETURN(-ENOTEMPTY);
710         } else {
711                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
712         }
713
714         RETURN(rc);
715 }
716
717 static int mdd_rename_tgt(const struct lu_env *env,
718                           struct md_object *pobj, struct md_object *tobj,
719                           const struct lu_fid *lf, const char *name,
720                           struct md_attr *ma)
721 {
722         struct mdd_device *mdd = mdo2mdd(pobj);
723         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
724         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
725         struct thandle *handle;
726         int rc;
727         ENTRY;
728
729         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
730         handle = mdd_trans_start(env, mdd);
731         if (IS_ERR(handle))
732                 RETURN(PTR_ERR(handle));
733
734         if (mdd_tobj)
735                 mdd_lock2(env, mdd_tpobj, mdd_tobj);
736         else
737                 mdd_write_lock(env, mdd_tpobj);
738
739         /*TODO rename sanity checking*/
740         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
741         if (rc)
742                 GOTO(cleanup, rc);
743
744         /* if rename_tgt is called then we should just re-insert name with
745          * correct fid, no need to dec/inc parent nlink if obj is dir */
746         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
747         if (rc)
748                 GOTO(cleanup, rc);
749
750         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
751                                      BYPASS_CAPA);
752         if (rc)
753                 GOTO(cleanup, rc);
754
755         if (tobj && lu_object_exists(&tobj->mo_lu))
756                 mdd_ref_del_internal(env, mdd_tobj, handle);
757 cleanup:
758         if (tobj)
759                 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
760         else
761                 mdd_write_unlock(env, mdd_tpobj);
762         mdd_trans_stop(env, mdd, rc, handle);
763         RETURN(rc);
764 }
765
766 /*
767  * The permission has been checked when obj created,
768  * no need check again.
769  */
770 static int mdd_cd_sanity_check(const struct lu_env *env,
771                                struct mdd_object *obj)
772 {
773         int rc = 0;
774         ENTRY;
775
776         /* EEXIST check */
777         if (!obj || mdd_is_dead_obj(obj))
778                 RETURN(-ENOENT);
779
780 #if 0
781         mdd_read_lock(env, obj);
782         rc = mdd_permission_internal(env, obj, MAY_WRITE);
783         mdd_read_unlock(env, obj);
784 #endif
785
786         RETURN(rc);
787
788 }
789
790 static int mdd_create_data(const struct lu_env *env,
791                            struct md_object *pobj, struct md_object *cobj,
792                            const struct md_create_spec *spec,
793                            struct md_attr *ma)
794 {
795         struct mdd_device *mdd = mdo2mdd(cobj);
796         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
797         struct mdd_object *son = md2mdd_obj(cobj);
798         struct lu_attr    *attr = &ma->ma_attr;
799         struct lov_mds_md *lmm = NULL;
800         int                lmm_size = 0;
801         struct thandle    *handle;
802         int                rc;
803         ENTRY;
804
805         rc = mdd_cd_sanity_check(env, son);
806         if (rc)
807                 RETURN(rc);
808
809         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
810                         !(spec->sp_cr_flags & FMODE_WRITE))
811                 RETURN(0);
812         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
813                             attr);
814         if (rc)
815                 RETURN(rc);
816
817         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
818         handle = mdd_trans_start(env, mdd);
819         if (IS_ERR(handle))
820                 RETURN(rc = PTR_ERR(handle));
821
822         /*
823          * XXX: Setting the lov ea is not locked but setting the attr is locked?
824          */
825
826         /* Replay creates has objects already */
827         if (spec->u.sp_ea.no_lov_create) {
828                 CDEBUG(D_INFO, "we already have lov ea\n");
829                 rc = mdd_lov_set_md(env, mdd_pobj, son,
830                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
831                                     spec->u.sp_ea.eadatalen, handle, 0);
832         } else
833                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
834                                     lmm_size, handle, 0);
835
836         if (rc == 0)
837                rc = mdd_attr_get_internal_locked(env, son, ma);
838
839         /* Finish mdd_lov_create() stuff. */
840         mdd_lov_create_finish(env, mdd, rc);
841         mdd_trans_stop(env, mdd, rc, handle);
842         if (lmm)
843                 OBD_FREE(lmm, lmm_size);
844         RETURN(rc);
845 }
846
847 static int
848 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
849              const char *name, const struct lu_fid* fid, int mask)
850 {
851         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
852         struct dt_object    *dir = mdd_object_child(mdd_obj);
853         struct dt_rec       *rec = (struct dt_rec *)fid;
854         const struct dt_key *key = (const struct dt_key *)name;
855         struct timeval      start;
856         int rc;
857         ENTRY;
858
859         mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
860         if (mdd_is_dead_obj(mdd_obj))
861                 RETURN(-ESTALE);
862
863         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
864         if (rc == 0)
865                 RETURN(-ESTALE);
866         else if (rc < 0) {
867                 CERROR("Object "DFID" locates on remote server\n",
868                         PFID(mdo2fid(mdd_obj)));
869                 LBUG();
870         }
871
872 #if 0
873         if (mask == MAY_EXEC)
874                 rc = mdd_exec_permission_lite(env, mdd_obj);
875         else
876 #endif
877         rc = mdd_permission_internal(env, mdd_obj, mask);
878         if (rc)
879                 RETURN(rc);
880
881         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
882                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
883                                                    mdd_object_capa(env, mdd_obj));
884         else
885                 rc = -ENOTDIR;
886
887         mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
888         RETURN(rc);
889 }
890
891 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
892                           struct mdd_object *child, struct md_attr *ma, 
893                           struct thandle *handle)
894 {
895         int rc;
896         ENTRY;
897
898         /* update attributes for child.
899          * FIXME:
900          *  (1) the valid bits should be converted between Lustre and Linux;
901          *  (2) maybe, the child attributes should be set in OSD when creation.
902          */
903
904         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
905         if (rc != 0)
906                 RETURN(rc);
907
908         if (S_ISDIR(ma->ma_attr.la_mode)) {
909                 /* add . and .. for newly created dir */
910                 mdd_ref_add_internal(env, child, handle);
911                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
912                                              dot, handle, BYPASS_CAPA);
913                 if (rc == 0) {
914                         rc = __mdd_index_insert_only(env, child, pfid,
915                                                      dotdot, handle,
916                                                      BYPASS_CAPA);
917                         if (rc != 0) {
918                                 int rc2;
919
920                                 rc2 = __mdd_index_delete(env, child, dot, 0,
921                                                          handle, BYPASS_CAPA);
922                                 if (rc2 != 0)
923                                         CERROR("Failure to cleanup after dotdot"
924                                                " creation: %d (%d)\n", rc2, rc);
925                                 else
926                                         mdd_ref_del_internal(env, child, handle);
927                         }
928                 }
929         }
930         RETURN(rc);
931 }
932
933 static int mdd_create_sanity_check(const struct lu_env *env,
934                                    struct md_object *pobj,
935                                    const char *name, struct md_attr *ma)
936 {
937         struct mdd_thread_info *info = mdd_env_info(env);
938         struct lu_attr    *la        = &info->mti_la;
939         struct lu_fid     *fid       = &info->mti_fid;
940         struct mdd_object *obj       = md2mdd_obj(pobj);
941         int rc;
942         ENTRY;
943
944         /* EEXIST check */
945         if (mdd_is_dead_obj(obj))
946                 RETURN(-ENOENT);
947
948         /*
949          * Check if the name already exist, though it will be checked
950          * in _index_insert also, for avoiding rolling back if exists
951          * _index_insert.
952          */
953         rc = __mdd_lookup_locked(env, pobj, name, fid,
954                                  MAY_WRITE | MAY_EXEC);
955         if (rc != -ENOENT)
956                 RETURN(rc ? : -EEXIST);
957
958         /* sgid check */
959         mdd_read_lock(env, obj);
960         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
961         mdd_read_unlock(env, obj);
962         if (rc != 0)
963                 RETURN(rc);
964
965         if (la->la_mode & S_ISGID) {
966                 ma->ma_attr.la_gid = la->la_gid;
967                 if (S_ISDIR(ma->ma_attr.la_mode)) {
968                         ma->ma_attr.la_mode |= S_ISGID;
969                         ma->ma_attr.la_valid |= LA_MODE;
970                 }
971         }
972
973         switch (ma->ma_attr.la_mode & S_IFMT) {
974         case S_IFREG:
975         case S_IFDIR:
976         case S_IFLNK:
977         case S_IFCHR:
978         case S_IFBLK:
979         case S_IFIFO:
980         case S_IFSOCK:
981                 rc = 0;
982                 break;
983         default:
984                 rc = -EINVAL;
985                 break;
986         }
987         RETURN(rc);
988 }
989
990 /*
991  * Create object and insert it into namespace.
992  */
993 static int mdd_create(const struct lu_env *env,
994                       struct md_object *pobj, const char *name,
995                       struct md_object *child,
996                       struct md_create_spec *spec,
997                       struct md_attr* ma)
998 {
999         struct mdd_device *mdd = mdo2mdd(pobj);
1000         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1001         struct mdd_object *son = md2mdd_obj(child);
1002         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1003         struct lu_attr    *attr = &ma->ma_attr;
1004         struct lov_mds_md *lmm = NULL;
1005         struct thandle    *handle;
1006         int rc, created = 0, inserted = 0, lmm_size = 0;
1007         struct timeval  start;
1008         ENTRY;
1009
1010         mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1011         /*
1012          * Two operations have to be performed:
1013          *
1014          *  - allocation of new object (->do_create()), and
1015          *
1016          *  - insertion into parent index (->dio_insert()).
1017          *
1018          * Due to locking, operation order is not important, when both are
1019          * successful, *but* error handling cases are quite different:
1020          *
1021          *  - if insertion is done first, and following object creation fails,
1022          *  insertion has to be rolled back, but this operation might fail
1023          *  also leaving us with dangling index entry.
1024          *
1025          *  - if creation is done first, is has to be undone if insertion
1026          *  fails, leaving us with leaked space, which is neither good, nor
1027          *  fatal.
1028          *
1029          * It seems that creation-first is simplest solution, but it is
1030          * sub-optimal in the frequent
1031          *
1032          *         $ mkdir foo
1033          *         $ mkdir foo
1034          *
1035          * case, because second mkdir is bound to create object, only to
1036          * destroy it immediately.
1037          *
1038          * To avoid this follow local file systems that do double lookup:
1039          *
1040          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1041          *
1042          *     1. create            (mdd_object_create_internal())
1043          *
1044          *     2. insert            (__mdd_index_insert(), lookup again)
1045          */
1046
1047         /* sanity checks before big job */
1048         rc = mdd_create_sanity_check(env, pobj, name, ma);
1049         if (rc)
1050                 RETURN(rc);
1051
1052         /* no RPC inside the transaction, so OST objects should be created at
1053          * first */
1054         if (S_ISREG(attr->la_mode)) {
1055                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1056                                     spec, attr);
1057                 if (rc)
1058                         RETURN(rc);
1059         }
1060
1061         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1062         handle = mdd_trans_start(env, mdd);
1063         if (IS_ERR(handle))
1064                 RETURN(PTR_ERR(handle));
1065
1066         mdd_write_lock(env, mdd_pobj);
1067
1068         /*
1069          * XXX check that link can be added to the parent in mkdir case.
1070          */
1071
1072         mdd_write_lock(env, son);
1073         rc = mdd_object_create_internal(env, son, ma, handle);
1074         if (rc) {
1075                 mdd_write_unlock(env, son);
1076                 GOTO(cleanup, rc);
1077         }
1078
1079         created = 1;
1080
1081 #ifdef CONFIG_FS_POSIX_ACL
1082         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1083         if (rc) {
1084                 mdd_write_unlock(env, son);
1085                 GOTO(cleanup, rc);
1086         } else {
1087                 ma->ma_attr.la_valid |= LA_MODE;
1088         }
1089 #endif
1090
1091         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1092                                      son, ma, handle);
1093         mdd_write_unlock(env, son);
1094         if (rc)
1095                 /*
1096                  * Object has no links, so it will be destroyed when last
1097                  * reference is released. (XXX not now.)
1098                  */
1099                 GOTO(cleanup, rc);
1100
1101         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1102                                 name, S_ISDIR(attr->la_mode), handle,
1103                                 mdd_object_capa(env, mdd_pobj));
1104
1105         if (rc)
1106                 GOTO(cleanup, rc);
1107
1108         inserted = 1;
1109         /* replay creates has objects already */
1110         if (spec->u.sp_ea.no_lov_create) {
1111                 CDEBUG(D_INFO, "we already have lov ea\n");
1112                 LASSERT(lmm != NULL);
1113                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1114                 lmm_size = spec->u.sp_ea.eadatalen;
1115         }
1116         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1117         if (rc) {
1118                 CERROR("error on stripe info copy %d \n", rc);
1119                 GOTO(cleanup, rc);
1120         }
1121         if (lmm && lmm_size > 0) {
1122                 /* set Lov here, do not get lmm again later */
1123                 memcpy(ma->ma_lmm, lmm, lmm_size); 
1124                 ma->ma_lmm_size = lmm_size;
1125                 ma->ma_valid |= MA_LOV; 
1126         }
1127
1128         if (S_ISLNK(attr->la_mode)) {
1129                 struct dt_object *dt = mdd_object_child(son);
1130                 const char *target_name = spec->u.sp_symname;
1131                 int sym_len = strlen(target_name);
1132                 const struct lu_buf *buf;
1133                 loff_t pos = 0;
1134
1135                 buf = mdd_buf_get_const(env, target_name, sym_len);
1136                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1137                                                 mdd_object_capa(env, son));
1138                 if (rc == sym_len)
1139                         rc = 0;
1140                 else
1141                         rc = -EFAULT;
1142         }
1143
1144         *la_copy = ma->ma_attr;
1145         la_copy->la_valid = LA_CTIME | LA_MTIME;
1146         rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
1147         if (rc)
1148                 GOTO(cleanup, rc);
1149
1150         /* return attr back */
1151         rc = mdd_attr_get_internal_locked(env, son, ma);
1152 cleanup:
1153         if (rc && created) {
1154                 int rc2 = 0;
1155
1156                 if (inserted) {
1157                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1158                                                  S_ISDIR(attr->la_mode),
1159                                                  handle, BYPASS_CAPA);
1160                         if (rc2)
1161                                 CERROR("error can not cleanup destroy %d\n",
1162                                        rc2);
1163                 }
1164                 if (rc2 == 0) {
1165                         mdd_write_lock(env, son);
1166                         mdd_ref_del_internal(env, son, handle);
1167                         mdd_write_unlock(env, son);
1168                 }
1169         }
1170         /* finish mdd_lov_create() stuff */
1171         mdd_lov_create_finish(env, mdd, rc);
1172         if (lmm)
1173                 OBD_FREE(lmm, lmm_size);
1174         mdd_write_unlock(env, mdd_pobj);
1175         mdd_trans_stop(env, mdd, rc, handle);
1176         mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1177         RETURN(rc);
1178 }
1179
1180 static int mdd_rename_lock(const struct lu_env *env,
1181                            struct mdd_device *mdd,
1182                            struct mdd_object *src_pobj,
1183                            struct mdd_object *tgt_pobj)
1184 {
1185         int rc;
1186         ENTRY;
1187
1188         if (src_pobj == tgt_pobj) {
1189                 mdd_write_lock(env, src_pobj);
1190                 RETURN(0);
1191         }
1192
1193         /* compared the parent child relationship of src_p&tgt_p */
1194         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1195                 mdd_lock2(env, src_pobj, tgt_pobj);
1196                 RETURN(0);
1197         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1198                 mdd_lock2(env, tgt_pobj, src_pobj);
1199                 RETURN(0);
1200         }
1201
1202         rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1203         if (rc < 0)
1204                 RETURN(rc);
1205
1206         if (rc == 1) {
1207                 mdd_lock2(env, tgt_pobj, src_pobj);
1208                 RETURN(0);
1209         }
1210
1211         mdd_lock2(env, src_pobj, tgt_pobj);
1212         RETURN(0);
1213 }
1214
1215 static void mdd_rename_unlock(const struct lu_env *env,
1216                               struct mdd_object *src_pobj,
1217                               struct mdd_object *tgt_pobj)
1218 {
1219         mdd_write_unlock(env, src_pobj);
1220         if (src_pobj != tgt_pobj)
1221                 mdd_write_unlock(env, tgt_pobj);
1222 }
1223
1224 static int mdd_rename_sanity_check(const struct lu_env *env,
1225                                    struct mdd_object *src_pobj,
1226                                    struct mdd_object *tgt_pobj,
1227                                    const struct lu_fid *sfid,
1228                                    int src_is_dir,
1229                                    struct mdd_object *tobj)
1230 {
1231         int rc;
1232         ENTRY;
1233
1234         if (mdd_is_dead_obj(src_pobj))
1235                 RETURN(-ENOENT);
1236
1237         /* The sobj maybe on the remote, check parent permission only here */
1238         rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
1239         if (rc)
1240                 RETURN(rc);
1241
1242         if (!tobj) {
1243                 rc = mdd_may_create(env, tgt_pobj, NULL,
1244                                     (src_pobj != tgt_pobj));
1245         } else {
1246                 mdd_read_lock(env, tobj);
1247                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1248                                     (src_pobj != tgt_pobj));
1249                 if (rc == 0)
1250                         if (S_ISDIR(mdd_object_type(tobj))
1251                             && mdd_dir_is_empty(env, tobj))
1252                                 rc = -ENOTEMPTY;
1253                 mdd_read_unlock(env, tobj);
1254         }
1255
1256         RETURN(rc);
1257 }
1258 /* src object can be remote that is why we use only fid and type of object */
1259 static int mdd_rename(const struct lu_env *env,
1260                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1261                       const struct lu_fid *lf, const char *sname,
1262                       struct md_object *tobj, const char *tname,
1263                       struct md_attr *ma)
1264 {
1265         struct mdd_device *mdd = mdo2mdd(src_pobj);
1266         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1267         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1268         struct mdd_object *mdd_sobj = NULL;
1269         struct mdd_object *mdd_tobj = NULL;
1270         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1271         struct thandle *handle;
1272         int is_dir;
1273         int rc;
1274         ENTRY;
1275
1276         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1277         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1278         if (ma->ma_attr.la_valid & LA_FLAGS &&
1279             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1280                 RETURN(-EPERM);
1281
1282         if (tobj)
1283                 mdd_tobj = md2mdd_obj(tobj);
1284
1285         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1286         handle = mdd_trans_start(env, mdd);
1287         if (IS_ERR(handle))
1288                 RETURN(PTR_ERR(handle));
1289
1290         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1291         rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
1292         if (rc)
1293                 GOTO(cleanup_unlocked, rc);
1294
1295         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1296                                      lf, is_dir, mdd_tobj);
1297         if (rc)
1298                 GOTO(cleanup, rc);
1299
1300         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1301                                 mdd_object_capa(env, mdd_spobj));
1302         if (rc)
1303                 GOTO(cleanup, rc);
1304
1305         /*
1306          * Here tobj can be remote one, so we do index_delete unconditionally
1307          * and -ENOENT is allowed.
1308          */
1309         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1310                                 mdd_object_capa(env, mdd_tpobj));
1311         if (rc != 0 && rc != -ENOENT)
1312                 GOTO(cleanup, rc);
1313
1314         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1315                                 mdd_object_capa(env, mdd_tpobj));
1316         if (rc)
1317                 GOTO(cleanup, rc);
1318
1319         mdd_sobj = mdd_object_find(env, mdd, lf);
1320         *la_copy = ma->ma_attr;
1321         la_copy->la_valid = LA_CTIME;
1322         if (mdd_sobj) {
1323                 /*XXX: how to update ctime for remote sobj? */
1324                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
1325                 if (rc)
1326                         GOTO(cleanup, rc);
1327         }
1328         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1329                 mdd_write_lock(env, mdd_tobj);
1330                 mdd_ref_del_internal(env, mdd_tobj, handle);
1331                 /* remove dot reference */
1332                 if (is_dir)
1333                         mdd_ref_del_internal(env, mdd_tobj, handle);
1334
1335                 la_copy->la_valid = LA_CTIME;
1336                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1337                 if (rc)
1338                         GOTO(cleanup, rc);
1339
1340                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1341                 mdd_write_unlock(env, mdd_tobj);
1342                 if (rc)
1343                         GOTO(cleanup, rc);
1344         }
1345
1346         la_copy->la_valid = LA_CTIME | LA_MTIME;
1347         rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
1348         if (rc)
1349                 GOTO(cleanup, rc);
1350
1351         if (mdd_spobj != mdd_tpobj) {
1352                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1353                 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
1354         }
1355
1356 cleanup:
1357         mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
1358 cleanup_unlocked:
1359         mdd_trans_stop(env, mdd, rc, handle);
1360         if (mdd_sobj)
1361                 mdd_object_put(env, mdd_sobj);
1362         RETURN(rc);
1363 }
1364
1365 struct md_dir_operations mdd_dir_ops = {
1366         .mdo_is_subdir     = mdd_is_subdir,
1367         .mdo_lookup        = mdd_lookup,
1368         .mdo_create        = mdd_create,
1369         .mdo_rename        = mdd_rename,
1370         .mdo_link          = mdd_link,
1371         .mdo_unlink        = mdd_unlink,
1372         .mdo_name_insert   = mdd_name_insert,
1373         .mdo_name_remove   = mdd_name_remove,
1374         .mdo_rename_tgt    = mdd_rename_tgt,
1375         .mdo_create_data   = mdd_create_data
1376 };