Whamcloud - gitweb
8621f69f3f8637ba1077abe6162aab5365ca6c4d
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check) {
200                 rc = mdd_permission_internal_locked(env, pobj,
201                                                     (MAY_WRITE | MAY_EXEC));
202         }
203         RETURN(rc);
204 }
205
206 /*
207  * It's inline, so penalty for filesystems that don't use sticky bit is
208  * minimal.
209  */
210 static inline int mdd_is_sticky(const struct lu_env *env,
211                                 struct mdd_object *pobj,
212                                 struct mdd_object *cobj)
213 {
214         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
215         struct md_ucred *uc = md_ucred(env);
216         int rc;
217
218         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
219         if (rc) {
220                 return rc;
221         } else if (tmp_la->la_uid == uc->mu_fsuid) {
222                 return 0;
223         } else {
224                 mdd_read_lock(env, pobj);
225                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
226                 mdd_read_unlock(env, pobj);
227                 if (rc)
228                         return rc;
229                 else if (!(tmp_la->la_mode & S_ISVTX))
230                         return 0;
231                 else if (tmp_la->la_uid == uc->mu_fsuid)
232                         return 0;
233                 else
234                         return !mdd_capable(uc, CAP_FOWNER);
235         }
236 }
237
238 /* Check whether it may delete the cobj under the pobj. */
239 static int mdd_may_delete(const struct lu_env *env,
240                           struct mdd_object *pobj,
241                           struct mdd_object *cobj,
242                           int is_dir, int need_check)
243 {
244         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
245         int rc = 0;
246         ENTRY;
247
248         LASSERT(cobj);
249
250         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
251                 RETURN(-ENOENT);
252
253         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
254                 RETURN(-EPERM);
255
256         if (is_dir) {
257                 if (!S_ISDIR(mdd_object_type(cobj)))
258                         RETURN(-ENOTDIR);
259
260                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
261                         RETURN(-EBUSY);
262
263         } else if (S_ISDIR(mdd_object_type(cobj))) {
264                 RETURN(-EISDIR);
265         }
266
267         if (pobj) {
268                 if (mdd_is_dead_obj(pobj))
269                         RETURN(-ENOENT);
270
271                 if (mdd_is_sticky(env, pobj, cobj))
272                         RETURN(-EPERM);
273
274                 if (need_check)
275                         rc = mdd_permission_internal_locked(env, pobj,
276                                                             MAY_WRITE |
277                                                             MAY_EXEC);
278         }
279         RETURN(rc);
280 }
281
282 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
283                           struct mdd_object *src_obj)
284 {
285         int rc = 0;
286         ENTRY;
287
288         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
289                 RETURN(-EPERM);
290
291         if (S_ISDIR(mdd_object_type(src_obj)))
292                 RETURN(-EPERM);
293
294         LASSERT(src_obj != tgt_obj);
295         if (tgt_obj) {
296                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
297                 if (rc)
298                         RETURN(rc);
299         }
300
301         RETURN(rc);
302 }
303
304 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
305                                    const struct lu_fid *fid)
306 {
307         struct mdd_thread_info *info = mdd_env_info(env);
308
309         fid_cpu_to_be(&info->mti_fid2, fid);
310         return (const struct dt_rec *)&info->mti_fid2;
311 }
312
313
314 /* insert new index, add reference if isdir, update times */
315 static int __mdd_index_insert(const struct lu_env *env,
316                              struct mdd_object *pobj, const struct lu_fid *lf,
317                              const char *name, int isdir, struct thandle *th,
318                              struct lustre_capa *capa)
319 {
320         struct dt_object *next = mdd_object_child(pobj);
321         struct timeval start;
322         int rc;
323         ENTRY;
324
325         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
326                              LPROC_MDD_INDEX_INSERT);
327 #if 0
328         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
329 #endif
330
331         if (dt_try_as_dir(env, next))
332                 rc = next->do_index_ops->dio_insert(env, next,
333                                                     __mdd_fid_rec(env, lf),
334                                                     (const struct dt_key *)name,
335                                                     th, capa);
336         else
337                 rc = -ENOTDIR;
338
339         if (rc == 0) {
340                 if (isdir) {
341                         mdd_write_lock(env, pobj);
342                         mdd_ref_add_internal(env, pobj, th);
343                         mdd_write_unlock(env, pobj);
344                 }
345 #if 0
346                 la->la_valid = LA_MTIME|LA_CTIME;
347                 la->la_atime = ma->ma_attr.la_atime;
348                 la->la_ctime = ma->ma_attr.la_ctime;
349                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
350 #endif
351         }
352         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
353                            LPROC_MDD_INDEX_INSERT);
354         return rc;
355 }
356
357 static int __mdd_index_delete(const struct lu_env *env,
358                               struct mdd_object *pobj, const char *name,
359                               int is_dir, struct thandle *handle,
360                               struct lustre_capa *capa)
361 {
362         struct dt_object *next = mdd_object_child(pobj);
363         struct timeval start;
364         int rc;
365         ENTRY;
366
367         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
368                              LPROC_MDD_INDEX_DELETE);
369         if (dt_try_as_dir(env, next)) {
370                 rc = next->do_index_ops->dio_delete(env, next,
371                                                     (struct dt_key *)name,
372                                                     handle, capa);
373                 if (rc == 0 && is_dir) {
374                         mdd_write_lock(env, pobj);
375                         mdd_ref_del_internal(env, pobj, handle);
376                         mdd_write_unlock(env, pobj);
377                 }
378         } else
379                 rc = -ENOTDIR;
380         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
381                            LPROC_MDD_INDEX_DELETE);
382         RETURN(rc);
383 }
384
385 static int __mdd_index_insert_only(const struct lu_env *env,
386                                    struct mdd_object *pobj,
387                                    const struct lu_fid *lf,
388                                    const char *name, struct thandle *th,
389                                    struct lustre_capa *capa)
390 {
391         struct dt_object *next = mdd_object_child(pobj);
392         int rc;
393         ENTRY;
394
395         if (dt_try_as_dir(env, next)) {
396                 rc = next->do_index_ops->dio_insert(env, next,
397                                                     __mdd_fid_rec(env, lf),
398                                                     (const struct dt_key *)name,
399                                                     th, capa);
400         } else {
401                 rc = -ENOTDIR;
402         }
403         RETURN(rc);
404 }
405
406 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
407                     struct md_object *src_obj, const char *name,
408                     struct md_attr *ma)
409 {
410         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
411         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
412         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
413         struct mdd_device *mdd = mdo2mdd(src_obj);
414         struct dynlock_handle *dlh;
415         struct thandle *handle;
416         int rc;
417         ENTRY;
418
419         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
420         handle = mdd_trans_start(env, mdd);
421         if (IS_ERR(handle))
422                 RETURN(PTR_ERR(handle));
423
424         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
425         if (dlh == NULL)
426                 GOTO(out_trans, rc = -ENOMEM);
427         mdd_write_lock(env, mdd_sobj);
428
429         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
430         if (rc)
431                 GOTO(out_unlock, rc);
432
433         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
434                                      name, handle,
435                                      mdd_object_capa(env, mdd_tobj));
436         if (rc)
437                 GOTO(out_unlock, rc);
438         
439         mdd_ref_add_internal(env, mdd_sobj, handle);
440
441         *la_copy = ma->ma_attr;
442         la_copy->la_valid = LA_CTIME;
443         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
444         if (rc)
445                 GOTO(out_unlock, rc);
446
447         la_copy->la_valid = LA_CTIME | LA_MTIME;
448         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
449
450 out_unlock:
451         mdd_write_unlock(env, mdd_sobj);
452         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
453 out_trans:
454         mdd_trans_stop(env, mdd, rc, handle);
455         RETURN(rc);
456 }
457
458 /* caller should take a lock before calling */
459 int mdd_finish_unlink(const struct lu_env *env,
460                       struct mdd_object *obj, struct md_attr *ma,
461                       struct thandle *th)
462 {
463         int rc;
464         ENTRY;
465
466         rc = mdd_iattr_get(env, obj, ma);
467         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
468                 /* add new orphan and the object
469                  * will be deleted during the object_put() */
470                 if (__mdd_orphan_add(env, obj, th) == 0)
471                         obj->mod_flags |= ORPHAN_OBJ;
472
473                 if (obj->mod_count == 0)
474                         rc = mdd_object_kill(env, obj, ma);
475                 else
476                         /* clear MA_LOV | MA_COOKIE, if we do not
477                          * unlink it in case we get it somewhere */
478                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
479         } else
480                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
481
482         RETURN(rc);
483 }
484
485 /*
486  * Check that @dir contains no entries except (possibly) dot and dotdot.
487  *
488  * Returns:
489  *
490  *             0        empty
491  *    -ENOTEMPTY        not empty
492  *           -ve        other error
493  *
494  */
495 static int mdd_dir_is_empty(const struct lu_env *env,
496                             struct mdd_object *dir)
497 {
498         struct dt_it     *it;
499         struct dt_object *obj;
500         struct dt_it_ops *iops;
501         int result;
502         ENTRY;
503
504         obj = mdd_object_child(dir);
505         iops = &obj->do_index_ops->dio_it;
506         it = iops->init(env, obj, 0, BYPASS_CAPA);
507         if (it != NULL) {
508                 result = iops->get(env, it, (const void *)"");
509                 if (result > 0) {
510                         int i;
511                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
512                                 result = iops->next(env, it);
513                         if (result == 0)
514                                 result = -ENOTEMPTY;
515                         else if (result == +1)
516                                 result = 0;
517                 } else if (result == 0)
518                         /*
519                          * Huh? Index contains no zero key?
520                          */
521                         result = -EIO;
522
523                 iops->put(env, it);
524                 iops->fini(env, it);
525         } else
526                 result = -ENOMEM;
527         RETURN(result);
528 }
529
530 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
531                             struct mdd_object *cobj, struct md_attr *ma)
532 {
533         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
534         int rc = 0;
535         ENTRY;
536
537         rc = mdd_may_delete(env, pobj, cobj,
538                             S_ISDIR(ma->ma_attr.la_mode), 1);
539         if (rc)
540                 RETURN(rc);
541
542         if (S_ISDIR(mdd_object_type(cobj))) {
543                 if (dt_try_as_dir(env, dt_cobj))
544                         rc = mdd_dir_is_empty(env, cobj);
545                 else
546                         rc = -ENOTDIR;
547         }
548
549         RETURN(rc);
550 }
551
552 static int mdd_unlink(const struct lu_env *env,
553                       struct md_object *pobj, struct md_object *cobj,
554                       const char *name, struct md_attr *ma)
555 {
556         struct mdd_device *mdd = mdo2mdd(pobj);
557         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
558         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
559         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
560         struct thandle    *handle;
561         struct dynlock_handle *dlh;
562         int rc, is_dir;
563         ENTRY;
564
565         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
566         if (rc)
567                 RETURN(rc);
568
569         handle = mdd_trans_start(env, mdd);
570         if (IS_ERR(handle))
571                 RETURN(PTR_ERR(handle));
572
573         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
574         if (dlh == NULL)
575                 GOTO(out_trans, rc = -ENOMEM);
576         mdd_write_lock(env, mdd_cobj);
577
578         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
579         if (rc)
580                 GOTO(cleanup, rc);
581
582         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
583         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
584                                 mdd_object_capa(env, mdd_pobj));
585         if (rc)
586                 GOTO(cleanup, rc);
587
588         mdd_ref_del_internal(env, mdd_cobj, handle);
589         *la_copy = ma->ma_attr;
590         if (is_dir) {
591                 /* unlink dot */
592                 mdd_ref_del_internal(env, mdd_cobj, handle);
593         } else {
594                 la_copy->la_valid = LA_CTIME;
595                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
596                 if (rc)
597                         GOTO(cleanup, rc);
598         }
599
600         la_copy->la_valid = LA_CTIME | LA_MTIME;
601         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
602         if (rc)
603                 GOTO(cleanup, rc);
604
605         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
606
607         if (rc == 0)
608                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
609                                    strlen("unlinked"), "unlinked", 0,
610                                    NULL, NULL);
611 cleanup:
612         mdd_write_unlock(env, mdd_cobj);
613         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
614 out_trans:
615         mdd_trans_stop(env, mdd, rc, handle);
616         RETURN(rc);
617 }
618
619 /*
620  * Partial operation. Be aware, this is called with write lock taken, so we use
621  * locksless version of __mdd_lookup() here.
622  */
623 static int mdd_ni_sanity_check(const struct lu_env *env,
624                                struct md_object *pobj,
625                                const char *name,
626                                const struct lu_fid *fid)
627 {
628         struct mdd_object *obj       = md2mdd_obj(pobj);
629 #if 0
630         int rc;
631 #endif
632         ENTRY;
633
634         /* EEXIST check */
635         if (mdd_is_dead_obj(obj))
636                 RETURN(-ENOENT);
637
638          /* The exist of the name will be checked in _index_insert. */
639 #if 0
640         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
641         if (rc != -ENOENT)
642                 RETURN(rc ? : -EEXIST);
643         else
644                 RETURN(0);
645 #endif
646         RETURN(mdd_permission_internal_locked(env, obj,
647                                               MAY_WRITE | MAY_EXEC));
648 }
649
650 static int mdd_name_insert(const struct lu_env *env,
651                            struct md_object *pobj,
652                            const char *name, const struct lu_fid *fid,
653                            int isdir)
654 {
655         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
656         struct mdd_device *mdd = mdo2mdd(pobj);
657         struct thandle *handle;
658         struct dynlock_handle *dlh;
659         int rc;
660         ENTRY;
661
662         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
663         handle = mdd_trans_start(env, mdo2mdd(pobj));
664         if (IS_ERR(handle))
665                 RETURN(PTR_ERR(handle));
666
667         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
668         if (dlh == NULL)
669                 GOTO(out_trans, rc = -ENOMEM);
670         rc = mdd_ni_sanity_check(env, pobj, name, fid);
671         if (rc)
672                 GOTO(out_unlock, rc);
673
674         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
675                                 BYPASS_CAPA);
676
677         EXIT;
678 out_unlock:
679         mdd_pdo_write_unlock(env, mdd_obj, dlh);
680 out_trans:
681         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
682         return rc;
683 }
684
685 /*
686  * Be aware, this is called with write lock taken, so we use locksless version
687  * of __mdd_lookup() here.
688  */
689 static int mdd_nr_sanity_check(const struct lu_env *env,
690                                struct md_object *pobj,
691                                const char *name)
692 {
693         struct mdd_object *obj       = md2mdd_obj(pobj);
694 #if 0
695         struct mdd_thread_info *info = mdd_env_info(env);
696         struct lu_fid     *fid       = &info->mti_fid;
697         int rc;
698 #endif
699         ENTRY;
700
701         /* EEXIST check */
702         if (mdd_is_dead_obj(obj))
703                 RETURN(-ENOENT);
704
705          /* The exist of the name will be checked in _index_delete. */
706 #if 0
707         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
708         RETURN(rc);
709 #endif
710         RETURN(mdd_permission_internal_locked(env, obj,
711                                               MAY_WRITE | MAY_EXEC));
712 }
713
714 static int mdd_name_remove(const struct lu_env *env,
715                            struct md_object *pobj,
716                            const char *name, int is_dir)
717 {
718         struct mdd_device *mdd = mdo2mdd(pobj);
719         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
720         struct thandle *handle;
721         struct dynlock_handle *dlh;
722         int rc;
723         ENTRY;
724
725         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
726         handle = mdd_trans_start(env, mdd);
727         if (IS_ERR(handle))
728                 RETURN(PTR_ERR(handle));
729
730         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
731         if (dlh == NULL)
732                 GOTO(out_trans, rc = -ENOMEM);
733         rc = mdd_nr_sanity_check(env, pobj, name);
734         if (rc)
735                 GOTO(out_unlock, rc);
736
737         rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
738                                 BYPASS_CAPA);
739
740 out_unlock:
741         mdd_pdo_write_unlock(env, mdd_obj, dlh);
742 out_trans:
743         mdd_trans_stop(env, mdd, rc, handle);
744         RETURN(rc);
745 }
746
747 static int mdd_rt_sanity_check(const struct lu_env *env,
748                                struct mdd_object *tgt_pobj,
749                                struct mdd_object *tobj,
750                                const struct lu_fid *sfid,
751                                const char *name, struct md_attr *ma)
752 {
753         int rc, src_is_dir;
754         ENTRY;
755
756         /* EEXIST check */
757         if (mdd_is_dead_obj(tgt_pobj))
758                 RETURN(-ENOENT);
759
760         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
761         if (tobj) {
762                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
763                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
764                      mdd_dir_is_empty(env, tobj))
765                                 RETURN(-ENOTEMPTY);
766         } else {
767                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
768         }
769
770         RETURN(rc);
771 }
772
773 static int mdd_rename_tgt(const struct lu_env *env,
774                           struct md_object *pobj, struct md_object *tobj,
775                           const struct lu_fid *lf, const char *name,
776                           struct md_attr *ma)
777 {
778         struct mdd_device *mdd = mdo2mdd(pobj);
779         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
780         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
781         struct thandle *handle;
782         struct dynlock_handle *dlh;
783         int rc;
784         ENTRY;
785
786         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
787         handle = mdd_trans_start(env, mdd);
788         if (IS_ERR(handle))
789                 RETURN(PTR_ERR(handle));
790
791         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
792         if (dlh == NULL)
793                 GOTO(out_trans, rc = -ENOMEM);
794         if (mdd_tobj)
795                 mdd_write_lock(env, mdd_tobj);
796
797         /*TODO rename sanity checking*/
798         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
799         if (rc)
800                 GOTO(cleanup, rc);
801
802         /* if rename_tgt is called then we should just re-insert name with
803          * correct fid, no need to dec/inc parent nlink if obj is dir */
804         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
805         if (rc)
806                 GOTO(cleanup, rc);
807
808         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
809                                      BYPASS_CAPA);
810         if (rc)
811                 GOTO(cleanup, rc);
812
813         if (tobj && lu_object_exists(&tobj->mo_lu))
814                 mdd_ref_del_internal(env, mdd_tobj, handle);
815 cleanup:
816         if (tobj)
817                 mdd_write_unlock(env, mdd_tobj);
818         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
819 out_trans:
820         mdd_trans_stop(env, mdd, rc, handle);
821         RETURN(rc);
822 }
823
824 /*
825  * The permission has been checked when obj created,
826  * no need check again.
827  */
828 static int mdd_cd_sanity_check(const struct lu_env *env,
829                                struct mdd_object *obj)
830 {
831         int rc = 0;
832         ENTRY;
833
834         /* EEXIST check */
835         if (!obj || mdd_is_dead_obj(obj))
836                 RETURN(-ENOENT);
837
838 #if 0
839         mdd_read_lock(env, obj);
840         rc = mdd_permission_internal(env, obj, MAY_WRITE);
841         mdd_read_unlock(env, obj);
842 #endif
843
844         RETURN(rc);
845
846 }
847
848 static int mdd_create_data(const struct lu_env *env,
849                            struct md_object *pobj, struct md_object *cobj,
850                            const struct md_create_spec *spec,
851                            struct md_attr *ma)
852 {
853         struct mdd_device *mdd = mdo2mdd(cobj);
854         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
855         struct mdd_object *son = md2mdd_obj(cobj);
856         struct lu_attr    *attr = &ma->ma_attr;
857         struct lov_mds_md *lmm = NULL;
858         int                lmm_size = 0;
859         struct thandle    *handle;
860         int                rc;
861         ENTRY;
862
863         rc = mdd_cd_sanity_check(env, son);
864         if (rc)
865                 RETURN(rc);
866
867         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
868                         !(spec->sp_cr_flags & FMODE_WRITE))
869                 RETURN(0);
870         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
871                             attr);
872         if (rc)
873                 RETURN(rc);
874
875         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
876         handle = mdd_trans_start(env, mdd);
877         if (IS_ERR(handle))
878                 RETURN(rc = PTR_ERR(handle));
879
880         /*
881          * XXX: Setting the lov ea is not locked but setting the attr is locked?
882          */
883
884         /* Replay creates has objects already */
885         if (spec->u.sp_ea.no_lov_create) {
886                 CDEBUG(D_INFO, "we already have lov ea\n");
887                 rc = mdd_lov_set_md(env, mdd_pobj, son,
888                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
889                                     spec->u.sp_ea.eadatalen, handle, 0);
890         } else
891                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
892                                     lmm_size, handle, 0);
893
894         if (rc == 0)
895                rc = mdd_attr_get_internal_locked(env, son, ma);
896
897         /* Finish mdd_lov_create() stuff. */
898         mdd_lov_create_finish(env, mdd, rc);
899         mdd_trans_stop(env, mdd, rc, handle);
900         if (lmm)
901                 OBD_FREE(lmm, lmm_size);
902         RETURN(rc);
903 }
904
905 static int
906 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
907              const char *name, struct lu_fid* fid, int mask)
908 {
909         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
910         struct dt_object    *dir = mdd_object_child(mdd_obj);
911         struct dt_rec       *rec = (struct dt_rec *)fid;
912         const struct dt_key *key = (const struct dt_key *)name;
913         struct timeval       start;
914         int rc;
915         ENTRY;
916
917         mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
918         if (mdd_is_dead_obj(mdd_obj))
919                 RETURN(-ESTALE);
920
921         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
922         if (rc == 0)
923                 RETURN(-ESTALE);
924         else if (rc < 0) {
925                 CERROR("Object "DFID" locates on remote server\n",
926                         PFID(mdo2fid(mdd_obj)));
927                 LBUG();
928         }
929
930 #if 0
931         if (mask == MAY_EXEC)
932                 rc = mdd_exec_permission_lite(env, mdd_obj);
933         else
934 #endif
935         rc = mdd_permission_internal_locked(env, mdd_obj, mask);
936         if (rc)
937                 RETURN(rc);
938
939         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
940                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
941                                                    mdd_object_capa(env, mdd_obj));
942                 if (rc == 0)
943                         fid_be_to_cpu(fid, fid);
944         } else
945                 rc = -ENOTDIR;
946
947         mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
948         RETURN(rc);
949 }
950
951 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
952                           struct mdd_object *child, struct md_attr *ma,
953                           struct thandle *handle)
954 {
955         int rc;
956         ENTRY;
957
958         /* update attributes for child.
959          * FIXME:
960          *  (1) the valid bits should be converted between Lustre and Linux;
961          *  (2) maybe, the child attributes should be set in OSD when creation.
962          */
963
964         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
965         if (rc != 0)
966                 RETURN(rc);
967
968         if (S_ISDIR(ma->ma_attr.la_mode)) {
969                 /* add . and .. for newly created dir */
970                 mdd_ref_add_internal(env, child, handle);
971                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
972                                              dot, handle, BYPASS_CAPA);
973                 if (rc == 0) {
974                         rc = __mdd_index_insert_only(env, child, pfid,
975                                                      dotdot, handle,
976                                                      BYPASS_CAPA);
977                         if (rc != 0) {
978                                 int rc2;
979
980                                 rc2 = __mdd_index_delete(env, child, dot, 0,
981                                                          handle, BYPASS_CAPA);
982                                 if (rc2 != 0)
983                                         CERROR("Failure to cleanup after dotdot"
984                                                " creation: %d (%d)\n", rc2, rc);
985                                 else
986                                         mdd_ref_del_internal(env, child, handle);
987                         }
988                 }
989         }
990         RETURN(rc);
991 }
992
993 static int mdd_create_sanity_check(const struct lu_env *env,
994                                    struct md_object *pobj,
995                                    const char *name, struct md_attr *ma)
996 {
997         struct mdd_thread_info *info = mdd_env_info(env);
998         struct lu_attr    *la        = &info->mti_la;
999         struct lu_fid     *fid       = &info->mti_fid;
1000         struct mdd_object *obj       = md2mdd_obj(pobj);
1001         int rc;
1002         ENTRY;
1003
1004         /* EEXIST check */
1005         if (mdd_is_dead_obj(obj))
1006                 RETURN(-ENOENT);
1007
1008         /*
1009          * Check if the name already exist, though it will be checked
1010          * in _index_insert also, for avoiding rolling back if exists
1011          * _index_insert.
1012          */
1013         rc = __mdd_lookup_locked(env, pobj, name, fid,
1014                                  MAY_WRITE | MAY_EXEC);
1015         if (rc != -ENOENT)
1016                 RETURN(rc ? : -EEXIST);
1017
1018         /* sgid check */
1019         mdd_read_lock(env, obj);
1020         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1021         mdd_read_unlock(env, obj);
1022         if (rc != 0)
1023                 RETURN(rc);
1024
1025         if (la->la_mode & S_ISGID) {
1026                 ma->ma_attr.la_gid = la->la_gid;
1027                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1028                         ma->ma_attr.la_mode |= S_ISGID;
1029                         ma->ma_attr.la_valid |= LA_MODE;
1030                 }
1031         }
1032
1033         switch (ma->ma_attr.la_mode & S_IFMT) {
1034         case S_IFREG:
1035         case S_IFDIR:
1036         case S_IFLNK:
1037         case S_IFCHR:
1038         case S_IFBLK:
1039         case S_IFIFO:
1040         case S_IFSOCK:
1041                 rc = 0;
1042                 break;
1043         default:
1044                 rc = -EINVAL;
1045                 break;
1046         }
1047         RETURN(rc);
1048 }
1049
1050 /*
1051  * Create object and insert it into namespace.
1052  */
1053 static int mdd_create(const struct lu_env *env,
1054                       struct md_object *pobj, const char *name,
1055                       struct md_object *child,
1056                       struct md_create_spec *spec,
1057                       struct md_attr* ma)
1058 {
1059         struct mdd_device *mdd = mdo2mdd(pobj);
1060         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1061         struct mdd_object *son = md2mdd_obj(child);
1062         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1063         struct lu_attr    *attr = &ma->ma_attr;
1064         struct lov_mds_md *lmm = NULL;
1065         struct thandle    *handle;
1066         int rc, created = 0, inserted = 0, lmm_size = 0;
1067         struct dynlock_handle *dlh;
1068         struct timeval  start;
1069         ENTRY;
1070
1071         mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1072         /*
1073          * Two operations have to be performed:
1074          *
1075          *  - allocation of new object (->do_create()), and
1076          *
1077          *  - insertion into parent index (->dio_insert()).
1078          *
1079          * Due to locking, operation order is not important, when both are
1080          * successful, *but* error handling cases are quite different:
1081          *
1082          *  - if insertion is done first, and following object creation fails,
1083          *  insertion has to be rolled back, but this operation might fail
1084          *  also leaving us with dangling index entry.
1085          *
1086          *  - if creation is done first, is has to be undone if insertion
1087          *  fails, leaving us with leaked space, which is neither good, nor
1088          *  fatal.
1089          *
1090          * It seems that creation-first is simplest solution, but it is
1091          * sub-optimal in the frequent
1092          *
1093          *         $ mkdir foo
1094          *         $ mkdir foo
1095          *
1096          * case, because second mkdir is bound to create object, only to
1097          * destroy it immediately.
1098          *
1099          * To avoid this follow local file systems that do double lookup:
1100          *
1101          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1102          *
1103          *     1. create            (mdd_object_create_internal())
1104          *
1105          *     2. insert            (__mdd_index_insert(), lookup again)
1106          */
1107
1108         /* sanity checks before big job */
1109         rc = mdd_create_sanity_check(env, pobj, name, ma);
1110         if (rc)
1111                 RETURN(rc);
1112
1113         /* no RPC inside the transaction, so OST objects should be created at
1114          * first */
1115         if (S_ISREG(attr->la_mode)) {
1116                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1117                                     spec, attr);
1118                 if (rc)
1119                         RETURN(rc);
1120         }
1121
1122         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1123         handle = mdd_trans_start(env, mdd);
1124         if (IS_ERR(handle))
1125                 RETURN(PTR_ERR(handle));
1126
1127         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1128         if (dlh == NULL)
1129                 GOTO(out_trans, rc = -ENOMEM);
1130
1131         /*
1132          * XXX check that link can be added to the parent in mkdir case.
1133          */
1134
1135         mdd_write_lock(env, son);
1136         rc = mdd_object_create_internal(env, son, ma, handle);
1137         if (rc) {
1138                 mdd_write_unlock(env, son);
1139                 GOTO(cleanup, rc);
1140         }
1141
1142         created = 1;
1143
1144 #ifdef CONFIG_FS_POSIX_ACL
1145         mdd_read_lock(env, mdd_pobj);
1146         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1147         mdd_read_unlock(env, mdd_pobj);
1148         if (rc) {
1149                 mdd_write_unlock(env, son);
1150                 GOTO(cleanup, rc);
1151         } else {
1152                 ma->ma_attr.la_valid |= LA_MODE;
1153         }
1154 #endif
1155
1156         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1157                                    son, ma, handle);
1158         mdd_write_unlock(env, son);
1159         if (rc)
1160                 /*
1161                  * Object has no links, so it will be destroyed when last
1162                  * reference is released. (XXX not now.)
1163                  */
1164                 GOTO(cleanup, rc);
1165
1166         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1167                                 name, S_ISDIR(attr->la_mode), handle,
1168                                 mdd_object_capa(env, mdd_pobj));
1169
1170         if (rc)
1171                 GOTO(cleanup, rc);
1172
1173         inserted = 1;
1174         /* replay creates has objects already */
1175         if (spec->u.sp_ea.no_lov_create) {
1176                 CDEBUG(D_INFO, "we already have lov ea\n");
1177                 LASSERT(lmm == NULL);
1178                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1179                 lmm_size = spec->u.sp_ea.eadatalen;
1180         }
1181         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1182         if (rc) {
1183                 CERROR("error on stripe info copy %d \n", rc);
1184                 GOTO(cleanup, rc);
1185         }
1186         if (lmm && lmm_size > 0) {
1187                 /* set Lov here, do not get lmm again later */
1188                 memcpy(ma->ma_lmm, lmm, lmm_size);
1189                 ma->ma_lmm_size = lmm_size;
1190                 ma->ma_valid |= MA_LOV;
1191         }
1192
1193         if (S_ISLNK(attr->la_mode)) {
1194                 struct dt_object *dt = mdd_object_child(son);
1195                 const char *target_name = spec->u.sp_symname;
1196                 int sym_len = strlen(target_name);
1197                 const struct lu_buf *buf;
1198                 loff_t pos = 0;
1199
1200                 buf = mdd_buf_get_const(env, target_name, sym_len);
1201                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1202                                                 mdd_object_capa(env, son));
1203                 if (rc == sym_len)
1204                         rc = 0;
1205                 else
1206                         rc = -EFAULT;
1207         }
1208
1209         *la_copy = ma->ma_attr;
1210         la_copy->la_valid = LA_CTIME | LA_MTIME;
1211         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
1212         if (rc)
1213                 GOTO(cleanup, rc);
1214
1215         /* return attr back */
1216         rc = mdd_attr_get_internal_locked(env, son, ma);
1217 cleanup:
1218         if (rc && created) {
1219                 int rc2 = 0;
1220
1221                 if (inserted) {
1222                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1223                                                  S_ISDIR(attr->la_mode),
1224                                                  handle, BYPASS_CAPA);
1225                         if (rc2)
1226                                 CERROR("error can not cleanup destroy %d\n",
1227                                        rc2);
1228                 }
1229                 if (rc2 == 0) {
1230                         mdd_write_lock(env, son);
1231                         mdd_ref_del_internal(env, son, handle);
1232                         mdd_write_unlock(env, son);
1233                 }
1234         }
1235         /* finish mdd_lov_create() stuff */
1236         mdd_lov_create_finish(env, mdd, rc);
1237         if (lmm && !spec->u.sp_ea.no_lov_create)
1238                 OBD_FREE(lmm, lmm_size);
1239         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1240 out_trans:
1241         mdd_trans_stop(env, mdd, rc, handle);
1242         mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1243         RETURN(rc);
1244 }
1245
1246 /*
1247  * Get locks on parents in proper order
1248  * RETURN: < 0 - error, rename_order if successful
1249  */
1250 enum rename_order {
1251         MDD_RN_SAME,
1252         MDD_RN_SRCTGT,
1253         MDD_RN_TGTSRC
1254 };
1255
1256 static int mdd_rename_order(const struct lu_env *env,
1257                             struct mdd_device *mdd,
1258                             struct mdd_object *src_pobj,
1259                             struct mdd_object *tgt_pobj)
1260 {
1261         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1262         int rc;
1263         ENTRY;
1264
1265         if (src_pobj == tgt_pobj)
1266                 RETURN(MDD_RN_SAME);
1267
1268         /* compared the parent child relationship of src_p&tgt_p */
1269         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1270                 rc = MDD_RN_SRCTGT;
1271         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1272                 rc = MDD_RN_TGTSRC;
1273         } else {
1274                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1275                 if (rc == -EREMOTE)
1276                         rc = 0;
1277
1278                 if (rc == 1)
1279                         rc = MDD_RN_TGTSRC;
1280                 else if (rc == 0)
1281                         rc = MDD_RN_SRCTGT;
1282         }
1283
1284         RETURN(rc);
1285 }
1286
1287 static int mdd_rename_sanity_check(const struct lu_env *env,
1288                                    struct mdd_object *src_pobj,
1289                                    struct mdd_object *tgt_pobj,
1290                                    const struct lu_fid *sfid,
1291                                    int src_is_dir,
1292                                    struct mdd_object *tobj)
1293 {
1294         int rc;
1295         ENTRY;
1296
1297         if (mdd_is_dead_obj(src_pobj))
1298                 RETURN(-ENOENT);
1299
1300         /* The sobj maybe on the remote, check parent permission only here */
1301         rc = mdd_permission_internal_locked(env, src_pobj,
1302                                             MAY_WRITE | MAY_EXEC);
1303         if (rc)
1304                 RETURN(rc);
1305
1306         if (!tobj) {
1307                 rc = mdd_may_create(env, tgt_pobj, NULL,
1308                                     (src_pobj != tgt_pobj));
1309         } else {
1310                 mdd_read_lock(env, tobj);
1311                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1312                                     (src_pobj != tgt_pobj));
1313                 if (rc == 0)
1314                         if (S_ISDIR(mdd_object_type(tobj))
1315                             && mdd_dir_is_empty(env, tobj))
1316                                 rc = -ENOTEMPTY;
1317                 mdd_read_unlock(env, tobj);
1318         }
1319
1320         RETURN(rc);
1321 }
1322 /* src object can be remote that is why we use only fid and type of object */
1323 static int mdd_rename(const struct lu_env *env,
1324                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1325                       const struct lu_fid *lf, const char *sname,
1326                       struct md_object *tobj, const char *tname,
1327                       struct md_attr *ma)
1328 {
1329         struct mdd_device *mdd = mdo2mdd(src_pobj);
1330         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1331         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1332         struct mdd_object *mdd_sobj = NULL;
1333         struct mdd_object *mdd_tobj = NULL;
1334         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1335         struct dynlock_handle *sdlh, *tdlh;
1336         struct thandle *handle;
1337         int is_dir;
1338         int rc;
1339         ENTRY;
1340
1341         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1342         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1343         if (ma->ma_attr.la_valid & LA_FLAGS &&
1344             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1345                 RETURN(-EPERM);
1346
1347         if (tobj)
1348                 mdd_tobj = md2mdd_obj(tobj);
1349
1350         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1351         handle = mdd_trans_start(env, mdd);
1352         if (IS_ERR(handle))
1353                 RETURN(PTR_ERR(handle));
1354
1355         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1356         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1357         if (rc < 0)
1358                 GOTO(cleanup_unlocked, rc);
1359
1360         /* get locks in determined order */
1361         if (rc == MDD_RN_SAME) {
1362                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1363                 /* check hashes to determine do we need one lock or two */
1364                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1365                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1366                 else
1367                         tdlh = sdlh;
1368         } else if (rc == MDD_RN_SRCTGT) {
1369                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1370                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1371         } else {
1372                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1373                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1374         }
1375         if (sdlh == NULL || tdlh == NULL)
1376                 GOTO(cleanup, rc = -ENOMEM);
1377
1378         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1379                                      lf, is_dir, mdd_tobj);
1380         if (rc)
1381                 GOTO(cleanup, rc);
1382
1383         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1384                                 mdd_object_capa(env, mdd_spobj));
1385         if (rc)
1386                 GOTO(cleanup, rc);
1387
1388         /*
1389          * Here tobj can be remote one, so we do index_delete unconditionally
1390          * and -ENOENT is allowed.
1391          */
1392         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1393                                 mdd_object_capa(env, mdd_tpobj));
1394         if (rc != 0 && rc != -ENOENT)
1395                 GOTO(cleanup, rc);
1396
1397         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1398                                 mdd_object_capa(env, mdd_tpobj));
1399         if (rc)
1400                 GOTO(cleanup, rc);
1401
1402         mdd_sobj = mdd_object_find(env, mdd, lf);
1403         *la_copy = ma->ma_attr;
1404         la_copy->la_valid = LA_CTIME;
1405         if (mdd_sobj) {
1406                 /*XXX: how to update ctime for remote sobj? */
1407                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
1408                                                   handle, 1);
1409                 if (rc)
1410                         GOTO(cleanup, rc);
1411         }
1412         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1413                 mdd_write_lock(env, mdd_tobj);
1414                 mdd_ref_del_internal(env, mdd_tobj, handle);
1415                 /* remove dot reference */
1416                 if (is_dir)
1417                         mdd_ref_del_internal(env, mdd_tobj, handle);
1418
1419                 la_copy->la_valid = LA_CTIME;
1420                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1421                 if (rc)
1422                         GOTO(cleanup, rc);
1423
1424                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1425                 mdd_write_unlock(env, mdd_tobj);
1426                 if (rc)
1427                         GOTO(cleanup, rc);
1428         }
1429
1430         la_copy->la_valid = LA_CTIME | LA_MTIME;
1431         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
1432         if (rc)
1433                 GOTO(cleanup, rc);
1434
1435         if (mdd_spobj != mdd_tpobj) {
1436                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1437                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
1438                                                   handle, 0);
1439         }
1440
1441 cleanup:
1442         if (likely(tdlh) && sdlh != tdlh)
1443                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1444         if (likely(sdlh))
1445                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1446 cleanup_unlocked:
1447         mdd_trans_stop(env, mdd, rc, handle);
1448         if (mdd_sobj)
1449                 mdd_object_put(env, mdd_sobj);
1450         RETURN(rc);
1451 }
1452
1453 struct md_dir_operations mdd_dir_ops = {
1454         .mdo_is_subdir     = mdd_is_subdir,
1455         .mdo_lookup        = mdd_lookup,
1456         .mdo_create        = mdd_create,
1457         .mdo_rename        = mdd_rename,
1458         .mdo_link          = mdd_link,
1459         .mdo_unlink        = mdd_unlink,
1460         .mdo_name_insert   = mdd_name_insert,
1461         .mdo_name_remove   = mdd_name_remove,
1462         .mdo_rename_tgt    = mdd_rename_tgt,
1463         .mdo_create_data   = mdd_create_data
1464 };