Whamcloud - gitweb
- in cmm_split_check() passed to OBD_FREE correct size (though this is not bug, as...
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid, struct md_op_spec *spec)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check)
200                 rc = mdd_permission_internal_locked(env, pobj, NULL,
201                                                     MAY_WRITE | MAY_EXEC);
202
203         RETURN(rc);
204 }
205
206 static inline int mdd_is_sticky(const struct lu_env *env,
207                                 struct mdd_object *pobj,
208                                 struct mdd_object *cobj)
209 {
210         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211         struct md_ucred *uc = md_ucred(env);
212         int rc;
213
214         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
215         if (rc) {
216                 return rc;
217         } else if (tmp_la->la_uid == uc->mu_fsuid) {
218                 return 0;
219         } else {
220                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221                 if (rc)
222                         return rc;
223                 else if (!(tmp_la->la_mode & S_ISVTX) ||
224                          (tmp_la->la_uid == uc->mu_fsuid))
225                         return 0;
226                 else
227                         return !mdd_capable(uc, CAP_FOWNER);
228         }
229 }
230
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233                           struct mdd_object *pobj,
234                           struct mdd_object *cobj,
235                           int is_dir, int need_check)
236 {
237         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
238         int rc = 0;
239         ENTRY;
240
241         LASSERT(cobj);
242
243         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
244                 RETURN(-ENOENT);
245
246         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
247                 RETURN(-EPERM);
248
249         if (is_dir) {
250                 if (!S_ISDIR(mdd_object_type(cobj)))
251                         RETURN(-ENOTDIR);
252
253                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
254                         RETURN(-EBUSY);
255
256         } else if (S_ISDIR(mdd_object_type(cobj))) {
257                 RETURN(-EISDIR);
258         }
259
260         if (pobj) {
261                 if (mdd_is_dead_obj(pobj))
262                         RETURN(-ENOENT);
263
264                 if (mdd_is_sticky(env, pobj, cobj))
265                         RETURN(-EPERM);
266
267                 if (need_check)
268                         rc = mdd_permission_internal_locked(env, pobj, NULL,
269                                                      MAY_WRITE | MAY_EXEC);
270         }
271         RETURN(rc);
272 }
273
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275                           struct mdd_object *src_obj)
276 {
277         int rc = 0;
278         ENTRY;
279
280         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
281                 RETURN(-EPERM);
282
283         if (S_ISDIR(mdd_object_type(src_obj)))
284                 RETURN(-EPERM);
285
286         LASSERT(src_obj != tgt_obj);
287         if (tgt_obj) {
288                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
289                 if (rc)
290                         RETURN(rc);
291         }
292
293         RETURN(rc);
294 }
295
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297                                    const struct lu_fid *fid)
298 {
299         struct mdd_thread_info *info = mdd_env_info(env);
300
301         fid_cpu_to_be(&info->mti_fid2, fid);
302         return (const struct dt_rec *)&info->mti_fid2;
303 }
304
305
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308                               const struct lu_fid *lf, const char *name, int is_dir,
309                               struct thandle *handle, struct lustre_capa *capa)
310 {
311         struct dt_object *next = mdd_object_child(pobj);
312         struct timeval    start;
313         int               rc;
314         ENTRY;
315
316         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317                                LPROC_MDD_INDEX_INSERT);
318         if (dt_try_as_dir(env, next)) {
319                 rc = next->do_index_ops->dio_insert(env, next,
320                                                     __mdd_fid_rec(env, lf),
321                                                     (const struct dt_key *)name,
322                                                     handle, capa);
323         } else {
324                 rc = -ENOTDIR;
325         }
326
327         if (rc == 0) {
328                 if (is_dir) {
329                         mdd_write_lock(env, pobj);
330                         mdd_ref_add_internal(env, pobj, handle);
331                         mdd_write_unlock(env, pobj);
332                 }
333         }
334         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335                              LPROC_MDD_INDEX_INSERT);
336         RETURN(rc);
337 }
338
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340                               const char *name, int is_dir, struct thandle *handle,
341                               struct lustre_capa *capa)
342 {
343         struct dt_object *next = mdd_object_child(pobj);
344         struct timeval    start;
345         int               rc;
346         ENTRY;
347
348         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349                                LPROC_MDD_INDEX_DELETE);
350
351         if (dt_try_as_dir(env, next)) {
352                 rc = next->do_index_ops->dio_delete(env, next,
353                                                     (struct dt_key *)name,
354                                                     handle, capa);
355                 if (rc == 0 && is_dir) {
356                         mdd_write_lock(env, pobj);
357                         mdd_ref_del_internal(env, pobj, handle);
358                         mdd_write_unlock(env, pobj);
359                 }
360         } else
361                 rc = -ENOTDIR;
362
363         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364                              LPROC_MDD_INDEX_DELETE);
365         RETURN(rc);
366 }
367
368 static int
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370                         const struct lu_fid *lf, const char *name,
371                         struct thandle *handle, struct lustre_capa *capa)
372 {
373         struct dt_object *next = mdd_object_child(pobj);
374         int               rc;
375         ENTRY;
376
377         if (dt_try_as_dir(env, next)) {
378                 rc = next->do_index_ops->dio_insert(env, next,
379                                                     __mdd_fid_rec(env, lf),
380                                                     (const struct dt_key *)name,
381                                                     handle, capa);
382         } else {
383                 rc = -ENOTDIR;
384         }
385         RETURN(rc);
386 }
387
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389                     struct md_object *src_obj, const char *name,
390                     struct md_attr *ma)
391 {
392         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
393         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395         struct mdd_device *mdd = mdo2mdd(src_obj);
396         struct dynlock_handle *dlh;
397         struct thandle *handle;
398         int rc;
399         ENTRY;
400
401         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402         handle = mdd_trans_start(env, mdd);
403         if (IS_ERR(handle))
404                 RETURN(PTR_ERR(handle));
405
406         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
407         if (dlh == NULL)
408                 GOTO(out_trans, rc = -ENOMEM);
409         mdd_write_lock(env, mdd_sobj);
410
411         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
412         if (rc)
413                 GOTO(out_unlock, rc);
414
415         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
416                                      name, handle,
417                                      mdd_object_capa(env, mdd_tobj));
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         mdd_ref_add_internal(env, mdd_sobj, handle);
422
423         *la = ma->ma_attr;
424         la->la_valid = LA_CTIME | LA_MTIME;
425         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
426         if (rc)
427                 GOTO(out_unlock, rc);
428
429         la->la_valid = LA_CTIME;
430         rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
431         EXIT;
432 out_unlock:
433         mdd_write_unlock(env, mdd_sobj);
434         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
435 out_trans:
436         mdd_trans_stop(env, mdd, rc, handle);
437         return rc;
438 }
439
440 static inline void mdd_set_dead_obj(struct mdd_object *obj)
441 {
442         if (obj)
443                 obj->mod_flags |= DEAD_OBJ;
444 }
445
446 /* caller should take a lock before calling */
447 int mdd_finish_unlink(const struct lu_env *env,
448                       struct mdd_object *obj, struct md_attr *ma,
449                       struct thandle *th)
450 {
451         int rc;
452         ENTRY;
453
454         rc = mdd_iattr_get(env, obj, ma);
455         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
456                 /* add new orphan and the object
457                  * will be deleted during the object_put() */
458                 if (__mdd_orphan_add(env, obj, th) == 0)
459                         obj->mod_flags |= ORPHAN_OBJ;
460
461                 mdd_set_dead_obj(obj);
462                 if (obj->mod_count == 0)
463                         rc = mdd_object_kill(env, obj, ma);
464                 else
465                         /* clear MA_LOV | MA_COOKIE, if we do not
466                          * unlink it in case we get it somewhere */
467                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
468         } else
469                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
470
471         RETURN(rc);
472 }
473
474 /*
475  * Check that @dir contains no entries except (possibly) dot and dotdot.
476  *
477  * Returns:
478  *
479  *             0        empty
480  *    -ENOTEMPTY        not empty
481  *           -ve        other error
482  *
483  */
484 static int mdd_dir_is_empty(const struct lu_env *env,
485                             struct mdd_object *dir)
486 {
487         struct dt_it     *it;
488         struct dt_object *obj;
489         struct dt_it_ops *iops;
490         int result;
491         ENTRY;
492
493         obj = mdd_object_child(dir);
494         iops = &obj->do_index_ops->dio_it;
495         it = iops->init(env, obj, 0, BYPASS_CAPA);
496         if (it != NULL) {
497                 result = iops->get(env, it, (const void *)"");
498                 if (result > 0) {
499                         int i;
500                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
501                                 result = iops->next(env, it);
502                         if (result == 0)
503                                 result = -ENOTEMPTY;
504                         else if (result == +1)
505                                 result = 0;
506                 } else if (result == 0)
507                         /*
508                          * Huh? Index contains no zero key?
509                          */
510                         result = -EIO;
511
512                 iops->put(env, it);
513                 iops->fini(env, it);
514         } else
515                 result = -ENOMEM;
516         RETURN(result);
517 }
518
519 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
520                             struct mdd_object *cobj, struct md_attr *ma)
521 {
522         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
523         int rc = 0;
524         ENTRY;
525
526         rc = mdd_may_delete(env, pobj, cobj,
527                             S_ISDIR(ma->ma_attr.la_mode), 1);
528         if (rc)
529                 RETURN(rc);
530
531         if (S_ISDIR(mdd_object_type(cobj))) {
532                 if (dt_try_as_dir(env, dt_cobj))
533                         rc = mdd_dir_is_empty(env, cobj);
534                 else
535                         rc = -ENOTDIR;
536         }
537
538         RETURN(rc);
539 }
540
541 extern atomic_t lvar_enoent_debug;
542 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
543                       struct md_object *cobj, const char *name,
544                       struct md_attr *ma)
545 {
546         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
547         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
548         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
549         struct mdd_device *mdd = mdo2mdd(pobj);
550         struct dynlock_handle *dlh;
551         struct thandle    *handle;
552         int rc, is_dir;
553         ENTRY;
554
555         /*
556          * Check -ENOENT early here because we need to get object type
557          * to calculate credits before transaction start
558          */
559         if (!lu_object_exists(&cobj->mo_lu)) {
560                 LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
561                                 "unlinking as `%s'", name);
562                 RETURN(-ENOENT);
563         }
564
565         LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
566                  PFID(lu_object_fid(&cobj->mo_lu)));
567
568         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
569         if (rc)
570                 RETURN(rc);
571
572         handle = mdd_trans_start(env, mdd);
573         if (IS_ERR(handle))
574                 RETURN(PTR_ERR(handle));
575
576         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
577         if (dlh == NULL)
578                 GOTO(out_trans, rc = -ENOMEM);
579         mdd_write_lock(env, mdd_cobj);
580
581         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
582         if (rc)
583                 GOTO(cleanup, rc);
584
585         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
586         /*
587          * This should be per-thread debugging flag, but
588          */
589         atomic_inc(&lvar_enoent_debug);
590         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
591                                 mdd_object_capa(env, mdd_pobj));
592         atomic_dec(&lvar_enoent_debug);
593         if (rc)
594                 GOTO(cleanup, rc);
595
596         mdd_ref_del_internal(env, mdd_cobj, handle);
597         if (is_dir) {
598                 /* unlink dot */
599                 mdd_ref_del_internal(env, mdd_cobj, handle);
600         }
601
602         *la = ma->ma_attr;
603         la->la_valid = LA_CTIME | LA_MTIME;
604         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
605         if (rc)
606                 GOTO(cleanup, rc);
607
608         la->la_valid = LA_CTIME;
609         rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
610         if (rc)
611                 GOTO(cleanup, rc);
612
613         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
614
615         if (rc == 0)
616                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
617                                    strlen("unlinked"), "unlinked", 0,
618                                    NULL, NULL);
619         EXIT;
620 cleanup:
621         mdd_write_unlock(env, mdd_cobj);
622         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
623 out_trans:
624         mdd_trans_stop(env, mdd, rc, handle);
625         return rc;
626 }
627
628 static int mdd_ni_sanity_check(const struct lu_env *env,
629                                struct md_object *pobj,
630                                const char *name,
631                                const struct lu_fid *fid)
632 {
633         struct mdd_object *obj = md2mdd_obj(pobj);
634         ENTRY;
635
636         /* EEXIST check */
637         if (mdd_is_dead_obj(obj))
638                 RETURN(-ENOENT);
639
640         /* The exist of the name will be checked in _index_insert. */
641         RETURN(mdd_permission_internal_locked(env, obj, NULL,
642                                               MAY_WRITE | MAY_EXEC));
643 }
644
645 /*
646  * Partial operation.
647  */
648 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
649                            const char *name, const struct lu_fid *fid,
650                            int is_dir)
651 {
652         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
653         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
654         struct mdd_device *mdd = mdo2mdd(pobj);
655         struct dynlock_handle *dlh;
656         struct thandle *handle;
657         int rc;
658         ENTRY;
659
660         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
661         handle = mdd_trans_start(env, mdo2mdd(pobj));
662         if (IS_ERR(handle))
663                 RETURN(PTR_ERR(handle));
664
665         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
666         if (dlh == NULL)
667                 GOTO(out_trans, rc = -ENOMEM);
668         rc = mdd_ni_sanity_check(env, pobj, name, fid);
669         if (rc)
670                 GOTO(out_unlock, rc);
671
672         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
673                                 handle, BYPASS_CAPA);
674         if (rc == 0) {
675                 la->la_ctime = la->la_atime = CURRENT_SECONDS;
676                 la->la_valid = LA_ATIME | LA_CTIME;
677                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
678         }
679         EXIT;
680 out_unlock:
681         mdd_pdo_write_unlock(env, mdd_obj, dlh);
682 out_trans:
683         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
684         return rc;
685 }
686
687 static int mdd_nr_sanity_check(const struct lu_env *env,
688                                struct md_object *pobj,
689                                const char *name)
690 {
691         struct mdd_object *obj = md2mdd_obj(pobj);
692         ENTRY;
693
694         /* EEXIST check */
695         if (mdd_is_dead_obj(obj)) {
696                 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
697                 RETURN(-ENOENT);
698         }
699
700         /* Name presense will be checked in _index_delete. */
701         RETURN(mdd_permission_internal_locked(env, obj, NULL,
702                                               MAY_WRITE | MAY_EXEC));
703 }
704
705 /*
706  * Partial operation.
707  */
708 static int mdd_name_remove(const struct lu_env *env,
709                            struct md_object *pobj,
710                            const char *name, int is_dir)
711 {
712         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
713         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
714         struct mdd_device *mdd = mdo2mdd(pobj);
715         struct dynlock_handle *dlh;
716         struct thandle *handle;
717         int rc;
718         ENTRY;
719
720         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
721         handle = mdd_trans_start(env, mdd);
722         if (IS_ERR(handle))
723                 RETURN(PTR_ERR(handle));
724
725         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
726         if (dlh == NULL)
727                 GOTO(out_trans, rc = -ENOMEM);
728         rc = mdd_nr_sanity_check(env, pobj, name);
729         if (rc)
730                 GOTO(out_unlock, rc);
731
732         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
733                                 handle, BYPASS_CAPA);
734         if (rc)
735                 GOTO(out_unlock, rc);
736
737         la->la_ctime = la->la_mtime = CURRENT_SECONDS;
738         la->la_valid = LA_CTIME | LA_MTIME;
739         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
740         EXIT;
741 out_unlock:
742         mdd_pdo_write_unlock(env, mdd_obj, dlh);
743 out_trans:
744         mdd_trans_stop(env, mdd, rc, handle);
745         return rc;
746 }
747
748 static int mdd_rt_sanity_check(const struct lu_env *env,
749                                struct mdd_object *tgt_pobj,
750                                struct mdd_object *tobj,
751                                const struct lu_fid *sfid,
752                                const char *name, struct md_attr *ma)
753 {
754         int rc, src_is_dir;
755         ENTRY;
756
757         /* EEXIST check */
758         if (mdd_is_dead_obj(tgt_pobj))
759                 RETURN(-ENOENT);
760
761         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
762         if (tobj) {
763                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
764                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
765                      mdd_dir_is_empty(env, tobj))
766                                 RETURN(-ENOTEMPTY);
767         } else {
768                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
769         }
770
771         RETURN(rc);
772 }
773
774 static int mdd_rename_tgt(const struct lu_env *env,
775                           struct md_object *pobj, struct md_object *tobj,
776                           const struct lu_fid *lf, const char *name,
777                           struct md_attr *ma)
778 {
779         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
780         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
781         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
782         struct mdd_device *mdd = mdo2mdd(pobj);
783         struct dynlock_handle *dlh;
784         struct thandle *handle;
785         int rc;
786         ENTRY;
787
788         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
789         handle = mdd_trans_start(env, mdd);
790         if (IS_ERR(handle))
791                 RETURN(PTR_ERR(handle));
792
793         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
794         if (dlh == NULL)
795                 GOTO(out_trans, rc = -ENOMEM);
796         if (mdd_tobj)
797                 mdd_write_lock(env, mdd_tobj);
798
799         /* XXX: Rename sanity checking. */
800         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
801         if (rc)
802                 GOTO(cleanup, rc);
803
804         /*
805          * If rename_tgt is called then we should just re-insert name with
806          * correct fid, no need to dec/inc parent nlink if obj is dir.
807          */
808         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
809         if (rc)
810                 GOTO(cleanup, rc);
811
812         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
813                                      BYPASS_CAPA);
814         if (rc)
815                 GOTO(cleanup, rc);
816
817         *la = ma->ma_attr;
818         la->la_valid = LA_CTIME | LA_MTIME;
819         rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
820         if (rc)
821                 GOTO(cleanup, rc);
822
823         if (tobj && lu_object_exists(&tobj->mo_lu)) {
824                 mdd_ref_del_internal(env, mdd_tobj, handle);
825                 la->la_valid = LA_CTIME;
826                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
827         }
828         EXIT;
829 cleanup:
830         if (tobj)
831                 mdd_write_unlock(env, mdd_tobj);
832         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
833 out_trans:
834         mdd_trans_stop(env, mdd, rc, handle);
835         return rc;
836 }
837
838 /*
839  * The permission has been checked when obj created, no need check again.
840  */
841 static int mdd_cd_sanity_check(const struct lu_env *env,
842                                struct mdd_object *obj)
843 {
844         ENTRY;
845
846         /* EEXIST check */
847         if (!obj || mdd_is_dead_obj(obj))
848                 RETURN(-ENOENT);
849
850         RETURN(0);
851
852 }
853
854 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
855                            struct md_object *cobj, const struct md_op_spec *spec,
856                            struct md_attr *ma)
857 {
858         struct mdd_device *mdd = mdo2mdd(cobj);
859         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
860         struct mdd_object *son = md2mdd_obj(cobj);
861         struct lu_attr    *attr = &ma->ma_attr;
862         struct lov_mds_md *lmm = NULL;
863         int                lmm_size = 0;
864         struct thandle    *handle;
865         int                rc;
866         ENTRY;
867
868         rc = mdd_cd_sanity_check(env, son);
869         if (rc)
870                 RETURN(rc);
871
872         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
873             !(spec->sp_cr_flags & FMODE_WRITE))
874                 RETURN(0);
875
876         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
877                             spec, attr);
878         if (rc)
879                 RETURN(rc);
880
881         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
882         handle = mdd_trans_start(env, mdd);
883         if (IS_ERR(handle))
884                 GOTO(out_free, rc = PTR_ERR(handle));
885
886         /*
887          * XXX: Setting the lov ea is not locked but setting the attr is locked?
888          * Should this be fixed?
889          */
890
891         /* Replay creates has objects already */
892         if (spec->u.sp_ea.no_lov_create) {
893                 CDEBUG(D_INFO, "we already have lov ea\n");
894                 rc = mdd_lov_set_md(env, mdd_pobj, son,
895                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
896                                     spec->u.sp_ea.eadatalen, handle, 0);
897         } else
898                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
899                                     lmm_size, handle, 0);
900
901         if (rc == 0)
902                rc = mdd_attr_get_internal_locked(env, son, ma);
903
904         mdd_trans_stop(env, mdd, rc, handle);
905 out_free:
906         /* Finish mdd_lov_create() stuff. */
907         mdd_lov_create_finish(env, mdd, rc);
908         if (lmm)
909                 OBD_FREE(lmm, lmm_size);
910         RETURN(rc);
911 }
912
913 static int
914 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
915              const char *name, struct lu_fid* fid, int mask)
916 {
917         const struct dt_key *key = (const struct dt_key *)name;
918         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
919         struct dt_object    *dir = mdd_object_child(mdd_obj);
920         struct dt_rec       *rec = (struct dt_rec *)fid;
921         struct timeval       start;
922         int rc;
923         ENTRY;
924
925         mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
926         if (mdd_is_dead_obj(mdd_obj))
927                 RETURN(-ENOENT);
928
929         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
930         if (rc == 0)
931                 RETURN(-ESTALE);
932         else if (rc < 0) {
933                 CERROR("Object "DFID" locates on remote server\n",
934                         PFID(mdo2fid(mdd_obj)));
935                 LBUG();
936         }
937
938         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
939         if (rc)
940                 RETURN(rc);
941
942         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
943                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
944                                                    mdd_object_capa(env, mdd_obj));
945                 if (rc == 0)
946                         fid_be_to_cpu(fid, fid);
947         } else
948                 rc = -ENOTDIR;
949
950         mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
951         RETURN(rc);
952 }
953
954 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
955                           struct mdd_object *child, struct md_attr *ma,
956                           struct thandle *handle)
957 {
958         int rc;
959         ENTRY;
960
961         /*
962          * Update attributes for child.
963          *
964          * FIXME:
965          *  (1) the valid bits should be converted between Lustre and Linux;
966          *  (2) maybe, the child attributes should be set in OSD when creation.
967          */
968
969         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
970         if (rc != 0)
971                 RETURN(rc);
972
973         if (S_ISDIR(ma->ma_attr.la_mode)) {
974                 /* Add "." and ".." for newly created dir */
975                 mdd_ref_add_internal(env, child, handle);
976                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
977                                              dot, handle, BYPASS_CAPA);
978                 if (rc == 0) {
979                         rc = __mdd_index_insert_only(env, child, pfid,
980                                                      dotdot, handle,
981                                                      BYPASS_CAPA);
982                         if (rc != 0) {
983                                 int rc2;
984
985                                 rc2 = __mdd_index_delete(env, child, dot, 0,
986                                                          handle, BYPASS_CAPA);
987                                 if (rc2 != 0)
988                                         CERROR("Failure to cleanup after dotdot"
989                                                " creation: %d (%d)\n", rc2, rc);
990                                 else
991                                         mdd_ref_del_internal(env, child, handle);
992                         }
993                 }
994         }
995         RETURN(rc);
996 }
997
998 static int mdd_create_sanity_check(const struct lu_env *env,
999                                    struct md_object *pobj,
1000                                    const char *name,
1001                                    struct md_attr *ma,
1002                                    int lookup)
1003 {
1004         struct mdd_thread_info *info = mdd_env_info(env);
1005         struct lu_attr    *la        = &info->mti_la;
1006         struct lu_fid     *fid       = &info->mti_fid;
1007         struct mdd_object *obj       = md2mdd_obj(pobj);
1008         int rc;
1009         ENTRY;
1010
1011         /* EEXIST check */
1012         if (mdd_is_dead_obj(obj))
1013                 RETURN(-ENOENT);
1014
1015         /*
1016          * In some cases this lookup is not needed - we know before if name
1017          * exists or not because MDT performs lookup for it.
1018          */
1019         /* XXX disable that lookup temporary */
1020         if (0 && lookup) {
1021                 /*
1022                  * Check if the name already exist, though it will be checked in
1023                  * _index_insert also, for avoiding rolling back if exists
1024                  * _index_insert.
1025                  */
1026                 rc = __mdd_lookup_locked(env, pobj, name, fid,
1027                                          MAY_WRITE | MAY_EXEC);
1028                 if (rc != -ENOENT)
1029                         RETURN(rc ? : -EEXIST);
1030         } else {
1031                 /*
1032                  * Check if has WRITE permission for the parent.
1033                  */
1034                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1035                 if (rc)
1036                         RETURN(rc);
1037         }
1038
1039         /* sgid check */
1040         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1041         if (rc != 0)
1042                 RETURN(rc);
1043
1044         if (la->la_mode & S_ISGID) {
1045                 ma->ma_attr.la_gid = la->la_gid;
1046                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1047                         ma->ma_attr.la_mode |= S_ISGID;
1048                         ma->ma_attr.la_valid |= LA_MODE;
1049                 }
1050         }
1051
1052         switch (ma->ma_attr.la_mode & S_IFMT) {
1053         case S_IFREG:
1054         case S_IFDIR:
1055         case S_IFLNK:
1056         case S_IFCHR:
1057         case S_IFBLK:
1058         case S_IFIFO:
1059         case S_IFSOCK:
1060                 rc = 0;
1061                 break;
1062         default:
1063                 rc = -EINVAL;
1064                 break;
1065         }
1066         RETURN(rc);
1067 }
1068
1069 /*
1070  * Create object and insert it into namespace.
1071  */
1072 static int mdd_create(const struct lu_env *env,
1073                       struct md_object *pobj, const char *name,
1074                       struct md_object *child,
1075                       struct md_op_spec *spec,
1076                       struct md_attr* ma)
1077 {
1078         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1079         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1080         struct mdd_object *son = md2mdd_obj(child);
1081         struct mdd_device *mdd = mdo2mdd(pobj);
1082         struct lu_attr    *attr = &ma->ma_attr;
1083         struct lov_mds_md *lmm = NULL;
1084         struct thandle    *handle;
1085         int rc, created = 0, inserted = 0, lmm_size = 0;
1086         struct dynlock_handle *dlh;
1087         struct timeval  start;
1088         ENTRY;
1089
1090         mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1091
1092         /*
1093          * Two operations have to be performed:
1094          *
1095          *  - allocation of new object (->do_create()), and
1096          *
1097          *  - insertion into parent index (->dio_insert()).
1098          *
1099          * Due to locking, operation order is not important, when both are
1100          * successful, *but* error handling cases are quite different:
1101          *
1102          *  - if insertion is done first, and following object creation fails,
1103          *  insertion has to be rolled back, but this operation might fail
1104          *  also leaving us with dangling index entry.
1105          *
1106          *  - if creation is done first, is has to be undone if insertion
1107          *  fails, leaving us with leaked space, which is neither good, nor
1108          *  fatal.
1109          *
1110          * It seems that creation-first is simplest solution, but it is
1111          * sub-optimal in the frequent
1112          *
1113          *         $ mkdir foo
1114          *         $ mkdir foo
1115          *
1116          * case, because second mkdir is bound to create object, only to
1117          * destroy it immediately.
1118          *
1119          * To avoid this follow local file systems that do double lookup:
1120          *
1121          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1122          *
1123          *     1. create            (mdd_object_create_internal())
1124          *
1125          *     2. insert            (__mdd_index_insert(), lookup again)
1126          */
1127
1128         /* Sanity checks before big job. */
1129         rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1130         if (rc)
1131                 RETURN(rc);
1132
1133         /*
1134          * No RPC inside the transaction, so OST objects should be created at
1135          * first.
1136          */
1137         if (S_ISREG(attr->la_mode)) {
1138                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1139                                     spec, attr);
1140                 if (rc)
1141                         RETURN(rc);
1142         }
1143
1144         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1145         handle = mdd_trans_start(env, mdd);
1146         if (IS_ERR(handle))
1147                 GOTO(out_free, rc = PTR_ERR(handle));
1148
1149         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1150         if (dlh == NULL)
1151                 GOTO(out_trans, rc = -ENOMEM);
1152
1153         /*
1154          * XXX: Check that link can be added to the parent in mkdir case.
1155          */
1156
1157         mdd_write_lock(env, son);
1158         rc = mdd_object_create_internal(env, son, ma, handle);
1159         if (rc) {
1160                 mdd_write_unlock(env, son);
1161                 GOTO(cleanup, rc);
1162         }
1163
1164         created = 1;
1165
1166 #ifdef CONFIG_FS_POSIX_ACL
1167         mdd_read_lock(env, mdd_pobj);
1168         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1169         mdd_read_unlock(env, mdd_pobj);
1170         if (rc) {
1171                 mdd_write_unlock(env, son);
1172                 GOTO(cleanup, rc);
1173         } else {
1174                 ma->ma_attr.la_valid |= LA_MODE;
1175         }
1176 #endif
1177
1178         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1179                                    son, ma, handle);
1180         mdd_write_unlock(env, son);
1181         if (rc)
1182                 /*
1183                  * Object has no links, so it will be destroyed when last
1184                  * reference is released. (XXX not now.)
1185                  */
1186                 GOTO(cleanup, rc);
1187
1188         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1189                                 name, S_ISDIR(attr->la_mode), handle,
1190                                 mdd_object_capa(env, mdd_pobj));
1191
1192         if (rc)
1193                 GOTO(cleanup, rc);
1194
1195         inserted = 1;
1196
1197         /* Replay creates has objects already. */
1198         if (spec->u.sp_ea.no_lov_create) {
1199                 CDEBUG(D_INFO, "we already have lov ea\n");
1200                 LASSERT(lmm == NULL);
1201                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1202                 lmm_size = spec->u.sp_ea.eadatalen;
1203         }
1204         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1205         if (rc) {
1206                 CERROR("error on stripe info copy %d \n", rc);
1207                 GOTO(cleanup, rc);
1208         }
1209         if (lmm && lmm_size > 0) {
1210                 /* Set Lov here, do not get lmm again later */
1211                 memcpy(ma->ma_lmm, lmm, lmm_size);
1212                 ma->ma_lmm_size = lmm_size;
1213                 ma->ma_valid |= MA_LOV;
1214         }
1215
1216         if (S_ISLNK(attr->la_mode)) {
1217                 struct dt_object *dt = mdd_object_child(son);
1218                 const char *target_name = spec->u.sp_symname;
1219                 int sym_len = strlen(target_name);
1220                 const struct lu_buf *buf;
1221                 loff_t pos = 0;
1222
1223                 buf = mdd_buf_get_const(env, target_name, sym_len);
1224                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1225                                                 mdd_object_capa(env, son));
1226
1227                 if (rc == sym_len)
1228                         rc = 0;
1229                 else
1230                         GOTO(cleanup, rc = -EFAULT);
1231         }
1232
1233         *la = ma->ma_attr;
1234         la->la_valid = LA_CTIME | LA_MTIME;
1235         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1236         if (rc)
1237                 GOTO(cleanup, rc);
1238
1239         /* Return attr back. */
1240         rc = mdd_attr_get_internal_locked(env, son, ma);
1241         EXIT;
1242 cleanup:
1243         if (rc && created) {
1244                 int rc2 = 0;
1245
1246                 if (inserted) {
1247                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1248                                                  S_ISDIR(attr->la_mode),
1249                                                  handle, BYPASS_CAPA);
1250                         if (rc2)
1251                                 CERROR("error can not cleanup destroy %d\n",
1252                                        rc2);
1253                 }
1254                 if (rc2 == 0) {
1255                         mdd_write_lock(env, son);
1256                         mdd_ref_del_internal(env, son, handle);
1257                         mdd_write_unlock(env, son);
1258                 }
1259         }
1260
1261         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1262 out_trans:
1263         mdd_trans_stop(env, mdd, rc, handle);
1264 out_free:
1265         if (lmm && !spec->u.sp_ea.no_lov_create)
1266                 OBD_FREE(lmm, lmm_size);
1267         /* Finish mdd_lov_create() stuff */
1268         mdd_lov_create_finish(env, mdd, rc);
1269         mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1270         return rc;
1271 }
1272
1273 /*
1274  * Get locks on parents in proper order
1275  * RETURN: < 0 - error, rename_order if successful
1276  */
1277 enum rename_order {
1278         MDD_RN_SAME,
1279         MDD_RN_SRCTGT,
1280         MDD_RN_TGTSRC
1281 };
1282
1283 static int mdd_rename_order(const struct lu_env *env,
1284                             struct mdd_device *mdd,
1285                             struct mdd_object *src_pobj,
1286                             struct mdd_object *tgt_pobj)
1287 {
1288         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1289         int rc;
1290         ENTRY;
1291
1292         if (src_pobj == tgt_pobj)
1293                 RETURN(MDD_RN_SAME);
1294
1295         /* compared the parent child relationship of src_p&tgt_p */
1296         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1297                 rc = MDD_RN_SRCTGT;
1298         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1299                 rc = MDD_RN_TGTSRC;
1300         } else {
1301                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1302                 if (rc == -EREMOTE)
1303                         rc = 0;
1304
1305                 if (rc == 1)
1306                         rc = MDD_RN_TGTSRC;
1307                 else if (rc == 0)
1308                         rc = MDD_RN_SRCTGT;
1309         }
1310
1311         RETURN(rc);
1312 }
1313
1314 static int mdd_rename_sanity_check(const struct lu_env *env,
1315                                    struct mdd_object *src_pobj,
1316                                    struct mdd_object *tgt_pobj,
1317                                    const struct lu_fid *sfid,
1318                                    int src_is_dir,
1319                                    struct mdd_object *tobj)
1320 {
1321         int rc;
1322         ENTRY;
1323
1324         if (mdd_is_dead_obj(src_pobj))
1325                 RETURN(-ENOENT);
1326
1327         /* The sobj maybe on the remote, check parent permission only here */
1328         rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1329                                             MAY_WRITE | MAY_EXEC);
1330         if (rc)
1331                 RETURN(rc);
1332
1333         if (!tobj) {
1334                 rc = mdd_may_create(env, tgt_pobj, NULL,
1335                                     (src_pobj != tgt_pobj));
1336         } else {
1337                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1338                                     (src_pobj != tgt_pobj));
1339                 if (rc == 0)
1340                         if (S_ISDIR(mdd_object_type(tobj))
1341                             && mdd_dir_is_empty(env, tobj))
1342                                 rc = -ENOTEMPTY;
1343         }
1344
1345         RETURN(rc);
1346 }
1347 /* src object can be remote that is why we use only fid and type of object */
1348 static int mdd_rename(const struct lu_env *env,
1349                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1350                       const struct lu_fid *lf, const char *sname,
1351                       struct md_object *tobj, const char *tname,
1352                       struct md_attr *ma)
1353 {
1354         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1355         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1356         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1357         struct mdd_device *mdd = mdo2mdd(src_pobj);
1358         struct mdd_object *mdd_sobj = NULL;
1359         struct mdd_object *mdd_tobj = NULL;
1360         struct dynlock_handle *sdlh, *tdlh;
1361         struct thandle *handle;
1362         int is_dir;
1363         int rc;
1364         ENTRY;
1365
1366         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1367         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1368         if (ma->ma_attr.la_valid & LA_FLAGS &&
1369             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1370                 RETURN(-EPERM);
1371
1372         if (tobj)
1373                 mdd_tobj = md2mdd_obj(tobj);
1374
1375         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1376         handle = mdd_trans_start(env, mdd);
1377         if (IS_ERR(handle))
1378                 RETURN(PTR_ERR(handle));
1379
1380         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1381         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1382         if (rc < 0)
1383                 GOTO(cleanup_unlocked, rc);
1384
1385         /* Get locks in determined order */
1386         if (rc == MDD_RN_SAME) {
1387                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1388                 /* check hashes to determine do we need one lock or two */
1389                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1390                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1391                 else
1392                         tdlh = sdlh;
1393         } else if (rc == MDD_RN_SRCTGT) {
1394                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1395                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1396         } else {
1397                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1398                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1399         }
1400         if (sdlh == NULL || tdlh == NULL)
1401                 GOTO(cleanup, rc = -ENOMEM);
1402
1403         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1404                                      lf, is_dir, mdd_tobj);
1405         if (rc)
1406                 GOTO(cleanup, rc);
1407
1408         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1409                                 mdd_object_capa(env, mdd_spobj));
1410         if (rc)
1411                 GOTO(cleanup, rc);
1412
1413         /*
1414          * Here tobj can be remote one, so we do index_delete unconditionally
1415          * and -ENOENT is allowed.
1416          */
1417         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1418                                 mdd_object_capa(env, mdd_tpobj));
1419         if (rc != 0 && rc != -ENOENT)
1420                 GOTO(cleanup, rc);
1421
1422         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1423                                 mdd_object_capa(env, mdd_tpobj));
1424         if (rc)
1425                 GOTO(cleanup, rc);
1426
1427         *la = ma->ma_attr;
1428         mdd_sobj = mdd_object_find(env, mdd, lf);
1429         if (mdd_sobj) {
1430                 la->la_valid = LA_CTIME;
1431
1432                 /* XXX: How to update ctime for remote sobj? */
1433                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1434                 if (rc)
1435                         GOTO(cleanup, rc);
1436         }
1437         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1438                 mdd_write_lock(env, mdd_tobj);
1439                 mdd_ref_del_internal(env, mdd_tobj, handle);
1440
1441                 /* Remove dot reference. */
1442                 if (is_dir)
1443                         mdd_ref_del_internal(env, mdd_tobj, handle);
1444
1445                 la->la_valid = LA_CTIME;
1446                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1447                 if (rc)
1448                         GOTO(cleanup, rc);
1449
1450                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1451                 mdd_write_unlock(env, mdd_tobj);
1452                 if (rc)
1453                         GOTO(cleanup, rc);
1454         }
1455
1456         la->la_valid = LA_CTIME | LA_MTIME;
1457         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1458         if (rc)
1459                 GOTO(cleanup, rc);
1460
1461         if (mdd_spobj != mdd_tpobj) {
1462                 la->la_valid = LA_CTIME | LA_MTIME;
1463                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1464                                                   handle, 0);
1465         }
1466
1467         EXIT;
1468 cleanup:
1469         if (likely(tdlh) && sdlh != tdlh)
1470                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1471         if (likely(sdlh))
1472                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1473 cleanup_unlocked:
1474         mdd_trans_stop(env, mdd, rc, handle);
1475         if (mdd_sobj)
1476                 mdd_object_put(env, mdd_sobj);
1477         return rc;
1478 }
1479
1480 struct md_dir_operations mdd_dir_ops = {
1481         .mdo_is_subdir     = mdd_is_subdir,
1482         .mdo_lookup        = mdd_lookup,
1483         .mdo_create        = mdd_create,
1484         .mdo_rename        = mdd_rename,
1485         .mdo_link          = mdd_link,
1486         .mdo_unlink        = mdd_unlink,
1487         .mdo_name_insert   = mdd_name_insert,
1488         .mdo_name_remove   = mdd_name_remove,
1489         .mdo_rename_tgt    = mdd_rename_tgt,
1490         .mdo_create_data   = mdd_create_data
1491 };