Whamcloud - gitweb
Branch b_new_cmd
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175
176         RETURN(rc);
177 }
178
179 /*Check whether it may create the cobj under the pobj*/
180 static int mdd_may_create(const struct lu_env *env,
181                           struct mdd_object *pobj, struct mdd_object *cobj,
182                           int need_check)
183 {
184         int rc = 0;
185         ENTRY;
186
187         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
188                 RETURN(-EEXIST);
189
190         if (mdd_is_dead_obj(pobj))
191                 RETURN(-ENOENT);
192
193         /*check pobj may create or not*/
194         if (need_check)
195                 rc = mdd_permission_internal_locked(env, pobj,
196                                              MAY_WRITE | MAY_EXEC);
197
198         RETURN(rc);
199 }
200
201 /*
202  * It's inline, so penalty for filesystems that don't use sticky bit is
203  * minimal.
204  */
205 static inline int mdd_is_sticky(const struct lu_env *env,
206                                 struct mdd_object *pobj,
207                                 struct mdd_object *cobj)
208 {
209         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
210         struct md_ucred *uc = md_ucred(env);
211         int rc;
212
213         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
214         if (rc) {
215                 return rc;
216         } else if (tmp_la->la_uid == uc->mu_fsuid) {
217                 return 0;
218         } else {
219                 mdd_read_lock(env, pobj);
220                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221                 mdd_read_unlock(env, pobj);
222                 if (rc)
223                         return rc;
224                 else if (!(tmp_la->la_mode & S_ISVTX))
225                         return 0;
226                 else if (tmp_la->la_uid == uc->mu_fsuid)
227                         return 0;
228                 else
229                         return !mdd_capable(uc, CAP_FOWNER);
230         }
231 }
232
233 /* Check whether it may delete the cobj under the pobj. */
234 static int mdd_may_delete(const struct lu_env *env,
235                           struct mdd_object *pobj,
236                           struct mdd_object *cobj,
237                           int is_dir, int need_check)
238 {
239         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
240         int rc = 0;
241         ENTRY;
242
243         LASSERT(cobj);
244
245         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
246                 RETURN(-ENOENT);
247
248         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
249                 RETURN(-EPERM);
250
251         if (is_dir) {
252                 if (!S_ISDIR(mdd_object_type(cobj)))
253                         RETURN(-ENOTDIR);
254
255                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
256                         RETURN(-EBUSY);
257
258         } else if (S_ISDIR(mdd_object_type(cobj))) {
259                         RETURN(-EISDIR);
260         }
261
262         if (pobj) {
263                 if (mdd_is_dead_obj(pobj))
264                         RETURN(-ENOENT);
265
266                 if (mdd_is_sticky(env, pobj, cobj))
267                         RETURN(-EPERM);
268
269                 if (need_check)
270                         rc = mdd_permission_internal_locked(env, pobj,
271                                                             MAY_WRITE |
272                                                             MAY_EXEC);
273         }
274         RETURN(rc);
275 }
276
277 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
278                           struct mdd_object *src_obj)
279 {
280         int rc = 0;
281         ENTRY;
282
283         if (tgt_obj) {
284                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
285                 if (rc)
286                         RETURN(rc);
287         }
288
289         if (S_ISDIR(mdd_object_type(src_obj)))
290                 RETURN(-EPERM);
291
292         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
293                 RETURN(-EPERM);
294
295         RETURN(rc);
296 }
297
298 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
299                                    const struct lu_fid *fid)
300 {
301         struct mdd_thread_info *info = mdd_env_info(env);
302
303         fid_cpu_to_be(&info->mti_fid2, fid);
304         return (const struct dt_rec *)&info->mti_fid2;
305 }
306
307
308 /* insert new index, add reference if isdir, update times */
309 static int __mdd_index_insert(const struct lu_env *env,
310                              struct mdd_object *pobj, const struct lu_fid *lf,
311                              const char *name, int isdir, struct thandle *th,
312                              struct lustre_capa *capa)
313 {
314         struct dt_object *next = mdd_object_child(pobj);
315         struct timeval start;
316         int rc;
317         ENTRY;
318
319         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
320                              LPROC_MDD_INDEX_INSERT);
321 #if 0
322         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
323 #endif
324
325         if (dt_try_as_dir(env, next))
326                 rc = next->do_index_ops->dio_insert(env, next,
327                                                     __mdd_fid_rec(env, lf),
328                                                     (const struct dt_key *)name,
329                                                     th, capa);
330         else
331                 rc = -ENOTDIR;
332
333         if (rc == 0) {
334                 if (isdir) {
335                         mdd_write_lock(env, pobj);
336                         mdd_ref_add_internal(env, pobj, th);
337                         mdd_write_unlock(env, pobj);
338                 }
339 #if 0
340                 la->la_valid = LA_MTIME|LA_CTIME;
341                 la->la_atime = ma->ma_attr.la_atime;
342                 la->la_ctime = ma->ma_attr.la_ctime;
343                 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
344 #endif
345         }
346         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
347                            LPROC_MDD_INDEX_INSERT);
348         return rc;
349 }
350
351 static int __mdd_index_delete(const struct lu_env *env,
352                               struct mdd_object *pobj, const char *name,
353                               int is_dir, struct thandle *handle,
354                               struct lustre_capa *capa)
355 {
356         struct dt_object *next = mdd_object_child(pobj);
357         struct timeval start;
358         int rc;
359         ENTRY;
360
361         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
362                              LPROC_MDD_INDEX_DELETE);
363         if (dt_try_as_dir(env, next)) {
364                 rc = next->do_index_ops->dio_delete(env, next,
365                                                     (struct dt_key *)name,
366                                                     handle, capa);
367                 if (rc == 0 && is_dir) {
368                         mdd_write_lock(env, pobj);
369                         mdd_ref_del_internal(env, pobj, handle);
370                         mdd_write_unlock(env, pobj);
371                 }
372         } else
373                 rc = -ENOTDIR;
374         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
375                            LPROC_MDD_INDEX_DELETE);
376         RETURN(rc);
377 }
378
379 static int __mdd_index_insert_only(const struct lu_env *env,
380                                    struct mdd_object *pobj,
381                                    const struct lu_fid *lf,
382                                    const char *name, struct thandle *th,
383                                    struct lustre_capa *capa)
384 {
385         int rc;
386         struct dt_object *next = mdd_object_child(pobj);
387         ENTRY;
388
389         if (dt_try_as_dir(env, next))
390                 rc = next->do_index_ops->dio_insert(env, next,
391                                          __mdd_fid_rec(env, lf),
392                                          (const struct dt_key *)name, th, capa);
393         else
394                 rc = -ENOTDIR;
395         RETURN(rc);
396 }
397
398 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
399                     struct md_object *src_obj, const char *name,
400                     struct md_attr *ma)
401 {
402         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
403         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
404         struct mdd_device *mdd = mdo2mdd(src_obj);
405         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
406         struct thandle *handle;
407         struct dynlock_handle *dlh;
408         int rc;
409         ENTRY;
410
411         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
412         handle = mdd_trans_start(env, mdd);
413         if (IS_ERR(handle))
414                 RETURN(PTR_ERR(handle));
415
416         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
417         if (dlh == NULL)
418                 GOTO(out_trans, rc = -ENOMEM);
419         mdd_write_lock(env, mdd_sobj);
420
421         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
422         if (rc)
423                 GOTO(out, rc);
424
425         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
426                                      name, handle,
427                                      mdd_object_capa(env, mdd_tobj));
428         if (rc == 0)
429                 mdd_ref_add_internal(env, mdd_sobj, handle);
430
431         *la_copy = ma->ma_attr;
432         la_copy->la_valid = LA_CTIME;
433         rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
434         if (rc)
435                 GOTO(out, rc);
436
437         la_copy->la_valid = LA_CTIME | LA_MTIME;
438         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
439
440 out:
441         mdd_write_unlock(env, mdd_sobj);
442         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
443 out_trans:
444         mdd_trans_stop(env, mdd, rc, handle);
445         RETURN(rc);
446 }
447
448 /* caller should take a lock before calling */
449 int mdd_finish_unlink(const struct lu_env *env,
450                       struct mdd_object *obj, struct md_attr *ma,
451                       struct thandle *th)
452 {
453         int rc;
454         ENTRY;
455
456         rc = mdd_iattr_get(env, obj, ma);
457         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
458                 /* add new orphan and the object
459                  * will be deleted during the object_put() */
460                 if (__mdd_orphan_add(env, obj, th) == 0)
461                         set_bit(LU_OBJECT_ORPHAN,
462                                 &mdd2lu_obj(obj)->lo_header->loh_flags);
463
464                 if (obj->mod_count == 0)
465                         rc = mdd_object_kill(env, obj, ma);
466                 else
467                         /* clear MA_LOV | MA_COOKIE, if we do not
468                          * unlink it in case we get it somewhere */
469                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
470         } else
471                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
472
473         RETURN(rc);
474 }
475
476 /*
477  * Check that @dir contains no entries except (possibly) dot and dotdot.
478  *
479  * Returns:
480  *
481  *             0        empty
482  *    -ENOTEMPTY        not empty
483  *           -ve        other error
484  *
485  */
486 static int mdd_dir_is_empty(const struct lu_env *env,
487                             struct mdd_object *dir)
488 {
489         struct dt_it     *it;
490         struct dt_object *obj;
491         struct dt_it_ops *iops;
492         int result;
493         ENTRY;
494
495         obj = mdd_object_child(dir);
496         iops = &obj->do_index_ops->dio_it;
497         it = iops->init(env, obj, 0, BYPASS_CAPA);
498         if (it != NULL) {
499                 result = iops->get(env, it, (const void *)"");
500                 if (result > 0) {
501                         int i;
502                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
503                                 result = iops->next(env, it);
504                         if (result == 0)
505                                 result = -ENOTEMPTY;
506                         else if (result == +1)
507                                 result = 0;
508                 } else if (result == 0)
509                         /*
510                          * Huh? Index contains no zero key?
511                          */
512                         result = -EIO;
513
514                 iops->put(env, it);
515                 iops->fini(env, it);
516         } else
517                 result = -ENOMEM;
518         RETURN(result);
519 }
520
521 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
522                             struct mdd_object *cobj, struct md_attr *ma)
523 {
524         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
525         int rc = 0;
526         ENTRY;
527
528         rc = mdd_may_delete(env, pobj, cobj,
529                             S_ISDIR(ma->ma_attr.la_mode), 1);
530         if (rc)
531                 RETURN(rc);
532
533         if (S_ISDIR(mdd_object_type(cobj))) {
534                 if (dt_try_as_dir(env, dt_cobj))
535                         rc = mdd_dir_is_empty(env, cobj);
536                 else
537                         rc = -ENOTDIR;
538         }
539
540         RETURN(rc);
541 }
542
543 static int mdd_unlink(const struct lu_env *env,
544                       struct md_object *pobj, struct md_object *cobj,
545                       const char *name, struct md_attr *ma)
546 {
547         struct mdd_device *mdd = mdo2mdd(pobj);
548         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
549         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
550         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
551         struct thandle    *handle;
552         struct dynlock_handle *dlh;
553         int rc, is_dir;
554         ENTRY;
555
556         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
557         if (rc)
558                 RETURN(rc);
559
560         handle = mdd_trans_start(env, mdd);
561         if (IS_ERR(handle))
562                 RETURN(PTR_ERR(handle));
563
564         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
565         if (dlh == NULL)
566                 GOTO(out_trans, rc = -ENOMEM);
567         mdd_write_lock(env, mdd_cobj);
568
569         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
570         if (rc)
571                 GOTO(cleanup, rc);
572
573         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
574         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
575                                 mdd_object_capa(env, mdd_pobj));
576         if (rc)
577                 GOTO(cleanup, rc);
578
579         mdd_ref_del_internal(env, mdd_cobj, handle);
580         *la_copy = ma->ma_attr;
581         if (is_dir) {
582                 /* unlink dot */
583                 mdd_ref_del_internal(env, mdd_cobj, handle);
584         } else {
585                 la_copy->la_valid = LA_CTIME;
586                 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
587                 if (rc)
588                         GOTO(cleanup, rc);
589         }
590
591         la_copy->la_valid = LA_CTIME | LA_MTIME;
592         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
593         if (rc)
594                 GOTO(cleanup, rc);
595
596         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
597
598         if (rc == 0)
599                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
600                                    strlen("unlinked"), "unlinked", 0,
601                                    NULL, NULL);
602 cleanup:
603         mdd_write_unlock(env, mdd_cobj);
604         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
605 out_trans:
606         mdd_trans_stop(env, mdd, rc, handle);
607         RETURN(rc);
608 }
609
610 /*
611  * Partial operation. Be aware, this is called with write lock taken, so we use
612  * locksless version of __mdd_lookup() here.
613  */
614 static int mdd_ni_sanity_check(const struct lu_env *env,
615                                struct md_object *pobj,
616                                const char *name,
617                                const struct lu_fid *fid)
618 {
619         struct mdd_object *obj       = md2mdd_obj(pobj);
620 #if 0
621         int rc;
622 #endif
623         ENTRY;
624
625         /* EEXIST check */
626         if (mdd_is_dead_obj(obj))
627                 RETURN(-ENOENT);
628
629          /* The exist of the name will be checked in _index_insert. */
630 #if 0
631         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
632         if (rc != -ENOENT)
633                 RETURN(rc ? : -EEXIST);
634         else
635                 RETURN(0);
636 #endif
637         RETURN(mdd_permission_internal_locked(env, obj,
638                                               MAY_WRITE | MAY_EXEC));
639 }
640
641 static int mdd_name_insert(const struct lu_env *env,
642                            struct md_object *pobj,
643                            const char *name, const struct lu_fid *fid,
644                            int isdir)
645 {
646         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
647         struct mdd_device *mdd = mdo2mdd(pobj);
648         struct thandle *handle;
649         struct dynlock_handle *dlh;
650         int rc;
651         ENTRY;
652
653         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
654         handle = mdd_trans_start(env, mdo2mdd(pobj));
655         if (IS_ERR(handle))
656                 RETURN(PTR_ERR(handle));
657
658         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
659         if (dlh == NULL)
660                 GOTO(out_trans, rc = -ENOMEM);
661         rc = mdd_ni_sanity_check(env, pobj, name, fid);
662         if (rc)
663                 GOTO(out_unlock, rc);
664
665         rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
666                                 BYPASS_CAPA);
667
668 out_unlock:
669         mdd_pdo_write_unlock(env, mdd_obj, dlh);
670 out_trans:
671         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
672         RETURN(rc);
673 }
674
675 /*
676  * Be aware, this is called with write lock taken, so we use locksless version
677  * of __mdd_lookup() here.
678  */
679 static int mdd_nr_sanity_check(const struct lu_env *env,
680                                struct md_object *pobj,
681                                const char *name)
682 {
683         struct mdd_object *obj       = md2mdd_obj(pobj);
684 #if 0
685         struct mdd_thread_info *info = mdd_env_info(env);
686         struct lu_fid     *fid       = &info->mti_fid;
687         int rc;
688 #endif
689         ENTRY;
690
691         /* EEXIST check */
692         if (mdd_is_dead_obj(obj))
693                 RETURN(-ENOENT);
694
695          /* The exist of the name will be checked in _index_delete. */
696 #if 0
697         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
698         RETURN(rc);
699 #endif
700         RETURN(mdd_permission_internal_locked(env, obj,
701                                               MAY_WRITE | MAY_EXEC));
702 }
703
704 static int mdd_name_remove(const struct lu_env *env,
705                            struct md_object *pobj,
706                            const char *name, int is_dir)
707 {
708         struct mdd_device *mdd = mdo2mdd(pobj);
709         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
710         struct thandle *handle;
711         struct dynlock_handle *dlh;
712         int rc;
713         ENTRY;
714
715         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
716         handle = mdd_trans_start(env, mdd);
717         if (IS_ERR(handle))
718                 RETURN(PTR_ERR(handle));
719
720         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
721         if (dlh == NULL)
722                 GOTO(out_trans, rc = -ENOMEM);
723         rc = mdd_nr_sanity_check(env, pobj, name);
724         if (rc)
725                 GOTO(out_unlock, rc);
726
727         rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
728                                 BYPASS_CAPA);
729
730 out_unlock:
731         mdd_pdo_write_unlock(env, mdd_obj, dlh);
732 out_trans:
733         mdd_trans_stop(env, mdd, rc, handle);
734         RETURN(rc);
735 }
736 static int mdd_rt_sanity_check(const struct lu_env *env,
737                                struct mdd_object *tgt_pobj,
738                                struct mdd_object *tobj,
739                                const struct lu_fid *sfid,
740                                const char *name, struct md_attr *ma)
741 {
742         int rc, src_is_dir;
743         ENTRY;
744
745         /* EEXIST check */
746         if (mdd_is_dead_obj(tgt_pobj))
747                 RETURN(-ENOENT);
748
749         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
750         if (tobj) {
751                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
752                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
753                      mdd_dir_is_empty(env, tobj))
754                                 RETURN(-ENOTEMPTY);
755         } else {
756                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
757         }
758
759         RETURN(rc);
760 }
761
762 static int mdd_rename_tgt(const struct lu_env *env,
763                           struct md_object *pobj, struct md_object *tobj,
764                           const struct lu_fid *lf, const char *name,
765                           struct md_attr *ma)
766 {
767         struct mdd_device *mdd = mdo2mdd(pobj);
768         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
769         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
770         struct thandle *handle;
771         struct dynlock_handle *dlh;
772         int rc;
773         ENTRY;
774
775         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
776         handle = mdd_trans_start(env, mdd);
777         if (IS_ERR(handle))
778                 RETURN(PTR_ERR(handle));
779
780         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
781         if (dlh == NULL)
782                 GOTO(out_trans, rc = -ENOMEM);
783         if (mdd_tobj)
784                 mdd_write_lock(env, mdd_tobj);
785
786         /*TODO rename sanity checking*/
787         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
788         if (rc)
789                 GOTO(cleanup, rc);
790
791         /* if rename_tgt is called then we should just re-insert name with
792          * correct fid, no need to dec/inc parent nlink if obj is dir */
793         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
794         if (rc)
795                 GOTO(cleanup, rc);
796
797         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
798                                      BYPASS_CAPA);
799         if (rc)
800                 GOTO(cleanup, rc);
801
802         if (tobj && lu_object_exists(&tobj->mo_lu))
803                 mdd_ref_del_internal(env, mdd_tobj, handle);
804 cleanup:
805         if (tobj)
806                 mdd_write_unlock(env, mdd_tobj);
807         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
808 out_trans:
809         mdd_trans_stop(env, mdd, rc, handle);
810         RETURN(rc);
811 }
812
813 /*
814  * The permission has been checked when obj created,
815  * no need check again.
816  */
817 static int mdd_cd_sanity_check(const struct lu_env *env,
818                                struct mdd_object *obj)
819 {
820         int rc = 0;
821         ENTRY;
822
823         /* EEXIST check */
824         if (!obj || mdd_is_dead_obj(obj))
825                 RETURN(-ENOENT);
826
827 #if 0
828         mdd_read_lock(env, obj);
829         rc = mdd_permission_internal(env, obj, MAY_WRITE);
830         mdd_read_unlock(env, obj);
831 #endif
832
833         RETURN(rc);
834
835 }
836
837 static int mdd_create_data(const struct lu_env *env,
838                            struct md_object *pobj, struct md_object *cobj,
839                            const struct md_create_spec *spec,
840                            struct md_attr *ma)
841 {
842         struct mdd_device *mdd = mdo2mdd(cobj);
843         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
844         struct mdd_object *son = md2mdd_obj(cobj);
845         struct lu_attr    *attr = &ma->ma_attr;
846         struct lov_mds_md *lmm = NULL;
847         int                lmm_size = 0;
848         struct thandle    *handle;
849         int                rc;
850         ENTRY;
851
852         rc = mdd_cd_sanity_check(env, son);
853         if (rc)
854                 RETURN(rc);
855
856         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
857                         !(spec->sp_cr_flags & FMODE_WRITE))
858                 RETURN(0);
859         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
860                             attr);
861         if (rc)
862                 RETURN(rc);
863
864         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
865         handle = mdd_trans_start(env, mdd);
866         if (IS_ERR(handle))
867                 RETURN(rc = PTR_ERR(handle));
868
869         /*
870          * XXX: Setting the lov ea is not locked but setting the attr is locked?
871          */
872
873         /* Replay creates has objects already */
874         if (spec->u.sp_ea.no_lov_create) {
875                 CDEBUG(D_INFO, "we already have lov ea\n");
876                 rc = mdd_lov_set_md(env, mdd_pobj, son,
877                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
878                                     spec->u.sp_ea.eadatalen, handle, 0);
879         } else
880                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
881                                     lmm_size, handle, 0);
882
883         if (rc == 0)
884                rc = mdd_attr_get_internal_locked(env, son, ma);
885
886         /* Finish mdd_lov_create() stuff. */
887         mdd_lov_create_finish(env, mdd, rc);
888         mdd_trans_stop(env, mdd, rc, handle);
889         if (lmm)
890                 OBD_FREE(lmm, lmm_size);
891         RETURN(rc);
892 }
893
894 static int
895 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
896              const char *name, struct lu_fid* fid, int mask)
897 {
898         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
899         struct dt_object    *dir = mdd_object_child(mdd_obj);
900         struct dt_rec       *rec = (struct dt_rec *)fid;
901         const struct dt_key *key = (const struct dt_key *)name;
902         struct timeval      start;
903         int rc;
904         ENTRY;
905
906         mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
907         if (mdd_is_dead_obj(mdd_obj))
908                 RETURN(-ESTALE);
909
910         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
911         if (rc == 0)
912                 RETURN(-ESTALE);
913         else if (rc < 0) {
914                 CERROR("Object "DFID" locates on remote server\n",
915                         PFID(mdo2fid(mdd_obj)));
916                 LBUG();
917         }
918
919 #if 0
920         if (mask == MAY_EXEC)
921                 rc = mdd_exec_permission_lite(env, mdd_obj);
922         else
923 #endif
924         rc = mdd_permission_internal_locked(env, mdd_obj, mask);
925         if (rc)
926                 RETURN(rc);
927
928         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
929                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
930                                                    mdd_object_capa(env, mdd_obj));
931                 if (rc == 0)
932                         fid_be_to_cpu(fid, fid);
933         } else
934                 rc = -ENOTDIR;
935
936         mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
937         RETURN(rc);
938 }
939
940 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
941                           struct mdd_object *child, struct md_attr *ma,
942                           struct thandle *handle)
943 {
944         int rc;
945         ENTRY;
946
947         /* update attributes for child.
948          * FIXME:
949          *  (1) the valid bits should be converted between Lustre and Linux;
950          *  (2) maybe, the child attributes should be set in OSD when creation.
951          */
952
953         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
954         if (rc != 0)
955                 RETURN(rc);
956
957         if (S_ISDIR(ma->ma_attr.la_mode)) {
958                 /* add . and .. for newly created dir */
959                 mdd_ref_add_internal(env, child, handle);
960                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
961                                              dot, handle, BYPASS_CAPA);
962                 if (rc == 0) {
963                         rc = __mdd_index_insert_only(env, child, pfid,
964                                                      dotdot, handle,
965                                                      BYPASS_CAPA);
966                         if (rc != 0) {
967                                 int rc2;
968
969                                 rc2 = __mdd_index_delete(env, child, dot, 0,
970                                                          handle, BYPASS_CAPA);
971                                 if (rc2 != 0)
972                                         CERROR("Failure to cleanup after dotdot"
973                                                " creation: %d (%d)\n", rc2, rc);
974                                 else
975                                         mdd_ref_del_internal(env, child, handle);
976                         }
977                 }
978         }
979         RETURN(rc);
980 }
981
982 static int mdd_create_sanity_check(const struct lu_env *env,
983                                    struct md_object *pobj,
984                                    const char *name, struct md_attr *ma)
985 {
986         struct mdd_thread_info *info = mdd_env_info(env);
987         struct lu_attr    *la        = &info->mti_la;
988         struct lu_fid     *fid       = &info->mti_fid;
989         struct mdd_object *obj       = md2mdd_obj(pobj);
990         int rc;
991         ENTRY;
992
993         /* EEXIST check */
994         if (mdd_is_dead_obj(obj))
995                 RETURN(-ENOENT);
996
997         /*
998          * Check if the name already exist, though it will be checked
999          * in _index_insert also, for avoiding rolling back if exists
1000          * _index_insert.
1001          */
1002         rc = __mdd_lookup_locked(env, pobj, name, fid,
1003                                  MAY_WRITE | MAY_EXEC);
1004         if (rc != -ENOENT)
1005                 RETURN(rc ? : -EEXIST);
1006
1007         /* sgid check */
1008         mdd_read_lock(env, obj);
1009         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1010         mdd_read_unlock(env, obj);
1011         if (rc != 0)
1012                 RETURN(rc);
1013
1014         if (la->la_mode & S_ISGID) {
1015                 ma->ma_attr.la_gid = la->la_gid;
1016                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1017                         ma->ma_attr.la_mode |= S_ISGID;
1018                         ma->ma_attr.la_valid |= LA_MODE;
1019                 }
1020         }
1021
1022         switch (ma->ma_attr.la_mode & S_IFMT) {
1023         case S_IFREG:
1024         case S_IFDIR:
1025         case S_IFLNK:
1026         case S_IFCHR:
1027         case S_IFBLK:
1028         case S_IFIFO:
1029         case S_IFSOCK:
1030                 rc = 0;
1031                 break;
1032         default:
1033                 rc = -EINVAL;
1034                 break;
1035         }
1036         RETURN(rc);
1037 }
1038
1039 /*
1040  * Create object and insert it into namespace.
1041  */
1042 static int mdd_create(const struct lu_env *env,
1043                       struct md_object *pobj, const char *name,
1044                       struct md_object *child,
1045                       struct md_create_spec *spec,
1046                       struct md_attr* ma)
1047 {
1048         struct mdd_device *mdd = mdo2mdd(pobj);
1049         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1050         struct mdd_object *son = md2mdd_obj(child);
1051         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1052         struct lu_attr    *attr = &ma->ma_attr;
1053         struct lov_mds_md *lmm = NULL;
1054         struct thandle    *handle;
1055         int rc, created = 0, inserted = 0, lmm_size = 0;
1056         struct dynlock_handle *dlh;
1057         struct timeval  start;
1058         ENTRY;
1059
1060         mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1061         /*
1062          * Two operations have to be performed:
1063          *
1064          *  - allocation of new object (->do_create()), and
1065          *
1066          *  - insertion into parent index (->dio_insert()).
1067          *
1068          * Due to locking, operation order is not important, when both are
1069          * successful, *but* error handling cases are quite different:
1070          *
1071          *  - if insertion is done first, and following object creation fails,
1072          *  insertion has to be rolled back, but this operation might fail
1073          *  also leaving us with dangling index entry.
1074          *
1075          *  - if creation is done first, is has to be undone if insertion
1076          *  fails, leaving us with leaked space, which is neither good, nor
1077          *  fatal.
1078          *
1079          * It seems that creation-first is simplest solution, but it is
1080          * sub-optimal in the frequent
1081          *
1082          *         $ mkdir foo
1083          *         $ mkdir foo
1084          *
1085          * case, because second mkdir is bound to create object, only to
1086          * destroy it immediately.
1087          *
1088          * To avoid this follow local file systems that do double lookup:
1089          *
1090          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1091          *
1092          *     1. create            (mdd_object_create_internal())
1093          *
1094          *     2. insert            (__mdd_index_insert(), lookup again)
1095          */
1096
1097         /* sanity checks before big job */
1098         rc = mdd_create_sanity_check(env, pobj, name, ma);
1099         if (rc)
1100                 RETURN(rc);
1101
1102         /* no RPC inside the transaction, so OST objects should be created at
1103          * first */
1104         if (S_ISREG(attr->la_mode)) {
1105                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1106                                     spec, attr);
1107                 if (rc)
1108                         RETURN(rc);
1109         }
1110
1111         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1112         handle = mdd_trans_start(env, mdd);
1113         if (IS_ERR(handle))
1114                 RETURN(PTR_ERR(handle));
1115
1116         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1117         if (dlh == NULL)
1118                 GOTO(out_trans, rc = -ENOMEM);
1119
1120         /*
1121          * XXX check that link can be added to the parent in mkdir case.
1122          */
1123
1124         mdd_write_lock(env, son);
1125         rc = mdd_object_create_internal(env, son, ma, handle);
1126         if (rc) {
1127                 mdd_write_unlock(env, son);
1128                 GOTO(cleanup, rc);
1129         }
1130
1131         created = 1;
1132
1133 #ifdef CONFIG_FS_POSIX_ACL
1134         mdd_read_lock(env, mdd_pobj);
1135         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1136         mdd_read_unlock(env, mdd_pobj);
1137         if (rc) {
1138                 mdd_write_unlock(env, son);
1139                 GOTO(cleanup, rc);
1140         } else {
1141                 ma->ma_attr.la_valid |= LA_MODE;
1142         }
1143 #endif
1144
1145         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1146                                    son, ma, handle);
1147         mdd_write_unlock(env, son);
1148         if (rc)
1149                 /*
1150                  * Object has no links, so it will be destroyed when last
1151                  * reference is released. (XXX not now.)
1152                  */
1153                 GOTO(cleanup, rc);
1154
1155         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1156                                 name, S_ISDIR(attr->la_mode), handle,
1157                                 mdd_object_capa(env, mdd_pobj));
1158
1159         if (rc)
1160                 GOTO(cleanup, rc);
1161
1162         inserted = 1;
1163         /* replay creates has objects already */
1164         if (spec->u.sp_ea.no_lov_create) {
1165                 CDEBUG(D_INFO, "we already have lov ea\n");
1166                 LASSERT(lmm == NULL);
1167                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1168                 lmm_size = spec->u.sp_ea.eadatalen;
1169         }
1170         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1171         if (rc) {
1172                 CERROR("error on stripe info copy %d \n", rc);
1173                 GOTO(cleanup, rc);
1174         }
1175         if (lmm && lmm_size > 0) {
1176                 /* set Lov here, do not get lmm again later */
1177                 memcpy(ma->ma_lmm, lmm, lmm_size);
1178                 ma->ma_lmm_size = lmm_size;
1179                 ma->ma_valid |= MA_LOV;
1180         }
1181
1182         if (S_ISLNK(attr->la_mode)) {
1183                 struct dt_object *dt = mdd_object_child(son);
1184                 const char *target_name = spec->u.sp_symname;
1185                 int sym_len = strlen(target_name);
1186                 const struct lu_buf *buf;
1187                 loff_t pos = 0;
1188
1189                 buf = mdd_buf_get_const(env, target_name, sym_len);
1190                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1191                                                 mdd_object_capa(env, son));
1192                 if (rc == sym_len)
1193                         rc = 0;
1194                 else
1195                         rc = -EFAULT;
1196         }
1197
1198         *la_copy = ma->ma_attr;
1199         la_copy->la_valid = LA_CTIME | LA_MTIME;
1200         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
1201         if (rc)
1202                 GOTO(cleanup, rc);
1203
1204         /* return attr back */
1205         rc = mdd_attr_get_internal_locked(env, son, ma);
1206 cleanup:
1207         if (rc && created) {
1208                 int rc2 = 0;
1209
1210                 if (inserted) {
1211                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1212                                                  S_ISDIR(attr->la_mode),
1213                                                  handle, BYPASS_CAPA);
1214                         if (rc2)
1215                                 CERROR("error can not cleanup destroy %d\n",
1216                                        rc2);
1217                 }
1218                 if (rc2 == 0) {
1219                         mdd_write_lock(env, son);
1220                         mdd_ref_del_internal(env, son, handle);
1221                         mdd_write_unlock(env, son);
1222                 }
1223         }
1224         /* finish mdd_lov_create() stuff */
1225         mdd_lov_create_finish(env, mdd, rc);
1226         if (lmm && !spec->u.sp_ea.no_lov_create)
1227                 OBD_FREE(lmm, lmm_size);
1228         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1229 out_trans:
1230         mdd_trans_stop(env, mdd, rc, handle);
1231         mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1232         RETURN(rc);
1233 }
1234
1235 /*
1236  * Get locks on parents in proper order
1237  * RETURN: < 0 - error, rename_order if successful
1238  */
1239 enum rename_order {
1240         MDD_RN_SAME,
1241         MDD_RN_SRCTGT,
1242         MDD_RN_TGTSRC
1243 };
1244
1245 static int mdd_rename_order(const struct lu_env *env,
1246                             struct mdd_device *mdd,
1247                             struct mdd_object *src_pobj,
1248                             struct mdd_object *tgt_pobj)
1249 {
1250         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1251         int rc;
1252         ENTRY;
1253
1254         if (src_pobj == tgt_pobj)
1255                 RETURN(MDD_RN_SAME);
1256
1257         /* compared the parent child relationship of src_p&tgt_p */
1258         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1259                 rc = MDD_RN_SRCTGT;
1260         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1261                 rc = MDD_RN_TGTSRC;
1262         } else {
1263                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1264                 if (rc == -EREMOTE)
1265                         rc = 0;
1266
1267                 if (rc == 1)
1268                         rc = MDD_RN_TGTSRC;
1269                 else if (rc == 0)
1270                         rc = MDD_RN_SRCTGT;
1271         }
1272
1273         RETURN(rc);
1274 }
1275
1276 static int mdd_rename_sanity_check(const struct lu_env *env,
1277                                    struct mdd_object *src_pobj,
1278                                    struct mdd_object *tgt_pobj,
1279                                    const struct lu_fid *sfid,
1280                                    int src_is_dir,
1281                                    struct mdd_object *tobj)
1282 {
1283         int rc;
1284         ENTRY;
1285
1286         if (mdd_is_dead_obj(src_pobj))
1287                 RETURN(-ENOENT);
1288
1289         /* The sobj maybe on the remote, check parent permission only here */
1290         rc = mdd_permission_internal_locked(env, src_pobj,
1291                                             MAY_WRITE | MAY_EXEC);
1292         if (rc)
1293                 RETURN(rc);
1294
1295         if (!tobj) {
1296                 rc = mdd_may_create(env, tgt_pobj, NULL,
1297                                     (src_pobj != tgt_pobj));
1298         } else {
1299                 mdd_read_lock(env, tobj);
1300                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1301                                     (src_pobj != tgt_pobj));
1302                 if (rc == 0)
1303                         if (S_ISDIR(mdd_object_type(tobj))
1304                             && mdd_dir_is_empty(env, tobj))
1305                                 rc = -ENOTEMPTY;
1306                 mdd_read_unlock(env, tobj);
1307         }
1308
1309         RETURN(rc);
1310 }
1311 /* src object can be remote that is why we use only fid and type of object */
1312 static int mdd_rename(const struct lu_env *env,
1313                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1314                       const struct lu_fid *lf, const char *sname,
1315                       struct md_object *tobj, const char *tname,
1316                       struct md_attr *ma)
1317 {
1318         struct mdd_device *mdd = mdo2mdd(src_pobj);
1319         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1320         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1321         struct mdd_object *mdd_sobj = NULL;
1322         struct mdd_object *mdd_tobj = NULL;
1323         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1324         struct dynlock_handle *sdlh, *tdlh;
1325         struct thandle *handle;
1326         int is_dir;
1327         int rc;
1328         ENTRY;
1329
1330         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1331         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1332         if (ma->ma_attr.la_valid & LA_FLAGS &&
1333             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1334                 RETURN(-EPERM);
1335
1336         if (tobj)
1337                 mdd_tobj = md2mdd_obj(tobj);
1338
1339         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1340         handle = mdd_trans_start(env, mdd);
1341         if (IS_ERR(handle))
1342                 RETURN(PTR_ERR(handle));
1343
1344         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1345         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1346         if (rc < 0)
1347                 GOTO(cleanup_unlocked, rc);
1348
1349         /* get locks in determined order */
1350         if (rc == MDD_RN_SAME) {
1351                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1352                 /* check hashes to determine do we need one lock or two */
1353                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1354                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1355                 else
1356                         tdlh = sdlh;
1357         } else if (rc == MDD_RN_SRCTGT) {
1358                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1359                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1360         } else {
1361                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1362                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1363         }
1364         if (sdlh == NULL || tdlh == NULL)
1365                 GOTO(cleanup, rc = -ENOMEM);
1366
1367         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1368                                      lf, is_dir, mdd_tobj);
1369         if (rc)
1370                 GOTO(cleanup, rc);
1371
1372         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1373                                 mdd_object_capa(env, mdd_spobj));
1374         if (rc)
1375                 GOTO(cleanup, rc);
1376
1377         /*
1378          * Here tobj can be remote one, so we do index_delete unconditionally
1379          * and -ENOENT is allowed.
1380          */
1381         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1382                                 mdd_object_capa(env, mdd_tpobj));
1383         if (rc != 0 && rc != -ENOENT)
1384                 GOTO(cleanup, rc);
1385
1386         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1387                                 mdd_object_capa(env, mdd_tpobj));
1388         if (rc)
1389                 GOTO(cleanup, rc);
1390
1391         mdd_sobj = mdd_object_find(env, mdd, lf);
1392         *la_copy = ma->ma_attr;
1393         la_copy->la_valid = LA_CTIME;
1394         if (mdd_sobj) {
1395                 /*XXX: how to update ctime for remote sobj? */
1396                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
1397                                                   handle, 1);
1398                 if (rc)
1399                         GOTO(cleanup, rc);
1400         }
1401         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1402                 mdd_write_lock(env, mdd_tobj);
1403                 mdd_ref_del_internal(env, mdd_tobj, handle);
1404                 /* remove dot reference */
1405                 if (is_dir)
1406                         mdd_ref_del_internal(env, mdd_tobj, handle);
1407
1408                 la_copy->la_valid = LA_CTIME;
1409                 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1410                 if (rc)
1411                         GOTO(cleanup, rc);
1412
1413                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1414                 mdd_write_unlock(env, mdd_tobj);
1415                 if (rc)
1416                         GOTO(cleanup, rc);
1417         }
1418
1419         la_copy->la_valid = LA_CTIME | LA_MTIME;
1420         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
1421         if (rc)
1422                 GOTO(cleanup, rc);
1423
1424         if (mdd_spobj != mdd_tpobj) {
1425                 la_copy->la_valid = LA_CTIME | LA_MTIME;
1426                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
1427                                                   handle, 0);
1428         }
1429
1430 cleanup:
1431         if (likely(tdlh) && sdlh != tdlh)
1432                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1433         if (likely(sdlh))
1434                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1435 cleanup_unlocked:
1436         mdd_trans_stop(env, mdd, rc, handle);
1437         if (mdd_sobj)
1438                 mdd_object_put(env, mdd_sobj);
1439         RETURN(rc);
1440 }
1441
1442 struct md_dir_operations mdd_dir_ops = {
1443         .mdo_is_subdir     = mdd_is_subdir,
1444         .mdo_lookup        = mdd_lookup,
1445         .mdo_create        = mdd_create,
1446         .mdo_rename        = mdd_rename,
1447         .mdo_link          = mdd_link,
1448         .mdo_unlink        = mdd_unlink,
1449         .mdo_name_insert   = mdd_name_insert,
1450         .mdo_name_remove   = mdd_name_remove,
1451         .mdo_rename_tgt    = mdd_rename_tgt,
1452         .mdo_create_data   = mdd_create_data
1453 };