Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid, struct md_op_spec *spec)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check)
200                 rc = mdd_permission_internal_locked(env, pobj, NULL,
201                                                     MAY_WRITE | MAY_EXEC);
202
203         RETURN(rc);
204 }
205
206 static inline int mdd_is_sticky(const struct lu_env *env,
207                                 struct mdd_object *pobj,
208                                 struct mdd_object *cobj)
209 {
210         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211         struct md_ucred *uc = md_ucred(env);
212         int rc;
213
214         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
215         if (rc) {
216                 return rc;
217         } else if (tmp_la->la_uid == uc->mu_fsuid) {
218                 return 0;
219         } else {
220                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221                 if (rc)
222                         return rc;
223                 else if (!(tmp_la->la_mode & S_ISVTX) ||
224                          (tmp_la->la_uid == uc->mu_fsuid))
225                         return 0;
226                 else
227                         return !mdd_capable(uc, CAP_FOWNER);
228         }
229 }
230
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233                           struct mdd_object *pobj,
234                           struct mdd_object *cobj,
235                           int is_dir, int need_check)
236 {
237         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
238         int rc = 0;
239         ENTRY;
240
241         LASSERT(cobj);
242
243         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
244                 RETURN(-ENOENT);
245
246         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
247                 RETURN(-EPERM);
248
249         if (is_dir) {
250                 if (!S_ISDIR(mdd_object_type(cobj)))
251                         RETURN(-ENOTDIR);
252
253                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
254                         RETURN(-EBUSY);
255
256         } else if (S_ISDIR(mdd_object_type(cobj))) {
257                 RETURN(-EISDIR);
258         }
259
260         if (pobj) {
261                 if (mdd_is_dead_obj(pobj))
262                         RETURN(-ENOENT);
263
264                 if (mdd_is_sticky(env, pobj, cobj))
265                         RETURN(-EPERM);
266
267                 if (need_check)
268                         rc = mdd_permission_internal_locked(env, pobj, NULL,
269                                                      MAY_WRITE | MAY_EXEC);
270         }
271         RETURN(rc);
272 }
273
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275                           struct mdd_object *src_obj)
276 {
277         int rc = 0;
278         ENTRY;
279
280         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
281                 RETURN(-EPERM);
282
283         if (S_ISDIR(mdd_object_type(src_obj)))
284                 RETURN(-EPERM);
285
286         LASSERT(src_obj != tgt_obj);
287         if (tgt_obj) {
288                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
289                 if (rc)
290                         RETURN(rc);
291         }
292
293         RETURN(rc);
294 }
295
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297                                    const struct lu_fid *fid)
298 {
299         struct mdd_thread_info *info = mdd_env_info(env);
300
301         fid_cpu_to_be(&info->mti_fid2, fid);
302         return (const struct dt_rec *)&info->mti_fid2;
303 }
304
305
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308                               const struct lu_fid *lf, const char *name, int is_dir,
309                               struct thandle *handle, struct lustre_capa *capa)
310 {
311         struct dt_object *next = mdd_object_child(pobj);
312         struct timeval    start;
313         int               rc;
314         ENTRY;
315
316         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317                                LPROC_MDD_INDEX_INSERT);
318         if (dt_try_as_dir(env, next)) {
319                 rc = next->do_index_ops->dio_insert(env, next,
320                                                     __mdd_fid_rec(env, lf),
321                                                     (const struct dt_key *)name,
322                                                     handle, capa);
323         } else {
324                 rc = -ENOTDIR;
325         }
326
327         if (rc == 0) {
328                 if (is_dir) {
329                         mdd_write_lock(env, pobj);
330                         mdd_ref_add_internal(env, pobj, handle);
331                         mdd_write_unlock(env, pobj);
332                 }
333         }
334         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335                              LPROC_MDD_INDEX_INSERT);
336         RETURN(rc);
337 }
338
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340                               const char *name, int is_dir, struct thandle *handle,
341                               struct lustre_capa *capa)
342 {
343         struct dt_object *next = mdd_object_child(pobj);
344         struct timeval    start;
345         int               rc;
346         ENTRY;
347
348         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349                                LPROC_MDD_INDEX_DELETE);
350
351         if (dt_try_as_dir(env, next)) {
352                 rc = next->do_index_ops->dio_delete(env, next,
353                                                     (struct dt_key *)name,
354                                                     handle, capa);
355                 if (rc == 0 && is_dir) {
356                         mdd_write_lock(env, pobj);
357                         mdd_ref_del_internal(env, pobj, handle);
358                         mdd_write_unlock(env, pobj);
359                 }
360         } else
361                 rc = -ENOTDIR;
362
363         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364                              LPROC_MDD_INDEX_DELETE);
365         RETURN(rc);
366 }
367
368 static int
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370                         const struct lu_fid *lf, const char *name,
371                         struct thandle *handle, struct lustre_capa *capa)
372 {
373         struct dt_object *next = mdd_object_child(pobj);
374         int               rc;
375         ENTRY;
376
377         if (dt_try_as_dir(env, next)) {
378                 rc = next->do_index_ops->dio_insert(env, next,
379                                                     __mdd_fid_rec(env, lf),
380                                                     (const struct dt_key *)name,
381                                                     handle, capa);
382         } else {
383                 rc = -ENOTDIR;
384         }
385         RETURN(rc);
386 }
387
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389                     struct md_object *src_obj, const char *name,
390                     struct md_attr *ma)
391 {
392         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
393         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395         struct mdd_device *mdd = mdo2mdd(src_obj);
396         struct dynlock_handle *dlh;
397         struct thandle *handle;
398         int rc;
399         ENTRY;
400
401         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402         handle = mdd_trans_start(env, mdd);
403         if (IS_ERR(handle))
404                 RETURN(PTR_ERR(handle));
405
406         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
407         if (dlh == NULL)
408                 GOTO(out_trans, rc = -ENOMEM);
409         mdd_write_lock(env, mdd_sobj);
410
411         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
412         if (rc)
413                 GOTO(out_unlock, rc);
414
415         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
416                                      name, handle,
417                                      mdd_object_capa(env, mdd_tobj));
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         mdd_ref_add_internal(env, mdd_sobj, handle);
422
423         *la = ma->ma_attr;
424         la->la_valid = LA_CTIME | LA_MTIME;
425         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
426         if (rc)
427                 GOTO(out_unlock, rc);
428
429         la->la_valid = LA_CTIME;
430         rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
431         EXIT;
432 out_unlock:
433         mdd_write_unlock(env, mdd_sobj);
434         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
435 out_trans:
436         mdd_trans_stop(env, mdd, rc, handle);
437         return rc;
438 }
439
440 /* caller should take a lock before calling */
441 int mdd_finish_unlink(const struct lu_env *env,
442                       struct mdd_object *obj, struct md_attr *ma,
443                       struct thandle *th)
444 {
445         int rc;
446         ENTRY;
447
448         rc = mdd_iattr_get(env, obj, ma);
449         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
450                 /* add new orphan and the object
451                  * will be deleted during the object_put() */
452                 if (__mdd_orphan_add(env, obj, th) == 0)
453                         obj->mod_flags |= ORPHAN_OBJ;
454
455                 if (obj->mod_count == 0)
456                         rc = mdd_object_kill(env, obj, ma);
457                 else
458                         /* clear MA_LOV | MA_COOKIE, if we do not
459                          * unlink it in case we get it somewhere */
460                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
461         } else
462                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
463
464         RETURN(rc);
465 }
466
467 /*
468  * Check that @dir contains no entries except (possibly) dot and dotdot.
469  *
470  * Returns:
471  *
472  *             0        empty
473  *    -ENOTEMPTY        not empty
474  *           -ve        other error
475  *
476  */
477 static int mdd_dir_is_empty(const struct lu_env *env,
478                             struct mdd_object *dir)
479 {
480         struct dt_it     *it;
481         struct dt_object *obj;
482         struct dt_it_ops *iops;
483         int result;
484         ENTRY;
485
486         obj = mdd_object_child(dir);
487         iops = &obj->do_index_ops->dio_it;
488         it = iops->init(env, obj, 0, BYPASS_CAPA);
489         if (it != NULL) {
490                 result = iops->get(env, it, (const void *)"");
491                 if (result > 0) {
492                         int i;
493                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
494                                 result = iops->next(env, it);
495                         if (result == 0)
496                                 result = -ENOTEMPTY;
497                         else if (result == +1)
498                                 result = 0;
499                 } else if (result == 0)
500                         /*
501                          * Huh? Index contains no zero key?
502                          */
503                         result = -EIO;
504
505                 iops->put(env, it);
506                 iops->fini(env, it);
507         } else
508                 result = -ENOMEM;
509         RETURN(result);
510 }
511
512 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
513                             struct mdd_object *cobj, struct md_attr *ma)
514 {
515         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
516         int rc = 0;
517         ENTRY;
518
519         rc = mdd_may_delete(env, pobj, cobj,
520                             S_ISDIR(ma->ma_attr.la_mode), 1);
521         if (rc)
522                 RETURN(rc);
523
524         if (S_ISDIR(mdd_object_type(cobj))) {
525                 if (dt_try_as_dir(env, dt_cobj))
526                         rc = mdd_dir_is_empty(env, cobj);
527                 else
528                         rc = -ENOTDIR;
529         }
530
531         RETURN(rc);
532 }
533
534 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
535                       struct md_object *cobj, const char *name,
536                       struct md_attr *ma)
537 {
538         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
539         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
540         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
541         struct mdd_device *mdd = mdo2mdd(pobj);
542         struct dynlock_handle *dlh;
543         struct thandle    *handle;
544         int rc, is_dir;
545         ENTRY;
546
547         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
548         if (rc)
549                 RETURN(rc);
550
551         handle = mdd_trans_start(env, mdd);
552         if (IS_ERR(handle))
553                 RETURN(PTR_ERR(handle));
554
555         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
556         if (dlh == NULL)
557                 GOTO(out_trans, rc = -ENOMEM);
558         mdd_write_lock(env, mdd_cobj);
559
560         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
561         if (rc)
562                 GOTO(cleanup, rc);
563
564         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
565         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
566                                 mdd_object_capa(env, mdd_pobj));
567         if (rc)
568                 GOTO(cleanup, rc);
569
570         mdd_ref_del_internal(env, mdd_cobj, handle);
571         if (is_dir) {
572                 /* unlink dot */
573                 mdd_ref_del_internal(env, mdd_cobj, handle);
574         }
575
576         *la = ma->ma_attr;
577         la->la_valid = LA_CTIME | LA_MTIME;
578         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
579         if (rc)
580                 GOTO(cleanup, rc);
581
582         la->la_valid = LA_CTIME;
583         rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
584         if (rc)
585                 GOTO(cleanup, rc);
586
587         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
588
589         if (rc == 0)
590                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
591                                    strlen("unlinked"), "unlinked", 0,
592                                    NULL, NULL);
593         EXIT;
594 cleanup:
595         mdd_write_unlock(env, mdd_cobj);
596         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
597 out_trans:
598         mdd_trans_stop(env, mdd, rc, handle);
599         return rc;
600 }
601
602 static int mdd_ni_sanity_check(const struct lu_env *env,
603                                struct md_object *pobj,
604                                const char *name,
605                                const struct lu_fid *fid)
606 {
607         struct mdd_object *obj = md2mdd_obj(pobj);
608         ENTRY;
609
610         /* EEXIST check */
611         if (mdd_is_dead_obj(obj))
612                 RETURN(-ENOENT);
613
614         /* The exist of the name will be checked in _index_insert. */
615         RETURN(mdd_permission_internal_locked(env, obj, NULL,
616                                               MAY_WRITE | MAY_EXEC));
617 }
618
619 /*
620  * Partial operation.
621  */
622 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
623                            const char *name, const struct lu_fid *fid,
624                            int is_dir)
625 {
626         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
627         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
628         struct mdd_device *mdd = mdo2mdd(pobj);
629         struct dynlock_handle *dlh;
630         struct thandle *handle;
631         int rc;
632         ENTRY;
633
634         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
635         handle = mdd_trans_start(env, mdo2mdd(pobj));
636         if (IS_ERR(handle))
637                 RETURN(PTR_ERR(handle));
638
639         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
640         if (dlh == NULL)
641                 GOTO(out_trans, rc = -ENOMEM);
642         rc = mdd_ni_sanity_check(env, pobj, name, fid);
643         if (rc)
644                 GOTO(out_unlock, rc);
645
646         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
647                                 handle, BYPASS_CAPA);
648
649         la->la_ctime = la->la_atime = CURRENT_SECONDS;
650         la->la_valid = LA_ATIME | LA_CTIME;
651         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
652         EXIT;
653 out_unlock:
654         mdd_pdo_write_unlock(env, mdd_obj, dlh);
655 out_trans:
656         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
657         return rc;
658 }
659
660 static int mdd_nr_sanity_check(const struct lu_env *env,
661                                struct md_object *pobj,
662                                const char *name)
663 {
664         struct mdd_object *obj = md2mdd_obj(pobj);
665         ENTRY;
666
667         /* EEXIST check */
668         if (mdd_is_dead_obj(obj)) {
669                 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
670                 RETURN(-ENOENT);
671         }
672
673         /* Name presense will be checked in _index_delete. */
674         RETURN(mdd_permission_internal_locked(env, obj, NULL,
675                                               MAY_WRITE | MAY_EXEC));
676 }
677
678 /*
679  * Partial operation.
680  */
681 static int mdd_name_remove(const struct lu_env *env,
682                            struct md_object *pobj,
683                            const char *name, int is_dir)
684 {
685         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
686         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
687         struct mdd_device *mdd = mdo2mdd(pobj);
688         struct dynlock_handle *dlh;
689         struct thandle *handle;
690         int rc;
691         ENTRY;
692
693         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
694         handle = mdd_trans_start(env, mdd);
695         if (IS_ERR(handle))
696                 RETURN(PTR_ERR(handle));
697
698         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
699         if (dlh == NULL)
700                 GOTO(out_trans, rc = -ENOMEM);
701         rc = mdd_nr_sanity_check(env, pobj, name);
702         if (rc)
703                 GOTO(out_unlock, rc);
704
705         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
706                                 handle, BYPASS_CAPA);
707         if (rc)
708                 GOTO(out_unlock, rc);
709
710         la->la_ctime = la->la_mtime = CURRENT_SECONDS;
711         la->la_valid = LA_CTIME | LA_MTIME;
712         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
713         EXIT;
714 out_unlock:
715         mdd_pdo_write_unlock(env, mdd_obj, dlh);
716 out_trans:
717         mdd_trans_stop(env, mdd, rc, handle);
718         return rc;
719 }
720
721 static int mdd_rt_sanity_check(const struct lu_env *env,
722                                struct mdd_object *tgt_pobj,
723                                struct mdd_object *tobj,
724                                const struct lu_fid *sfid,
725                                const char *name, struct md_attr *ma)
726 {
727         int rc, src_is_dir;
728         ENTRY;
729
730         /* EEXIST check */
731         if (mdd_is_dead_obj(tgt_pobj))
732                 RETURN(-ENOENT);
733
734         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
735         if (tobj) {
736                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
737                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
738                      mdd_dir_is_empty(env, tobj))
739                                 RETURN(-ENOTEMPTY);
740         } else {
741                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
742         }
743
744         RETURN(rc);
745 }
746
747 static int mdd_rename_tgt(const struct lu_env *env,
748                           struct md_object *pobj, struct md_object *tobj,
749                           const struct lu_fid *lf, const char *name,
750                           struct md_attr *ma)
751 {
752         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
753         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
754         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
755         struct mdd_device *mdd = mdo2mdd(pobj);
756         struct dynlock_handle *dlh;
757         struct thandle *handle;
758         int rc;
759         ENTRY;
760
761         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
762         handle = mdd_trans_start(env, mdd);
763         if (IS_ERR(handle))
764                 RETURN(PTR_ERR(handle));
765
766         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
767         if (dlh == NULL)
768                 GOTO(out_trans, rc = -ENOMEM);
769         if (mdd_tobj)
770                 mdd_write_lock(env, mdd_tobj);
771
772         /* XXX: Rename sanity checking. */
773         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
774         if (rc)
775                 GOTO(cleanup, rc);
776
777         /*
778          * If rename_tgt is called then we should just re-insert name with
779          * correct fid, no need to dec/inc parent nlink if obj is dir.
780          */
781         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
782         if (rc)
783                 GOTO(cleanup, rc);
784
785         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
786                                      BYPASS_CAPA);
787         if (rc)
788                 GOTO(cleanup, rc);
789
790         *la = ma->ma_attr;
791         la->la_valid = LA_CTIME | LA_MTIME;
792         rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
793         if (rc)
794                 GOTO(cleanup, rc);
795
796         if (tobj && lu_object_exists(&tobj->mo_lu)) {
797                 mdd_ref_del_internal(env, mdd_tobj, handle);
798                 la->la_valid = LA_CTIME;
799                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
800         }
801         EXIT;
802 cleanup:
803         if (tobj)
804                 mdd_write_unlock(env, mdd_tobj);
805         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
806 out_trans:
807         mdd_trans_stop(env, mdd, rc, handle);
808         return rc;
809 }
810
811 /*
812  * The permission has been checked when obj created, no need check again.
813  */
814 static int mdd_cd_sanity_check(const struct lu_env *env,
815                                struct mdd_object *obj)
816 {
817         ENTRY;
818
819         /* EEXIST check */
820         if (!obj || mdd_is_dead_obj(obj))
821                 RETURN(-ENOENT);
822
823         RETURN(0);
824
825 }
826
827 static int mdd_create_data(const struct lu_env *env,
828                            struct md_object *pobj, struct md_object *cobj,
829                            const struct md_op_spec *spec,
830                            struct md_attr *ma)
831 {
832         struct mdd_device *mdd = mdo2mdd(cobj);
833         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
834         struct mdd_object *son = md2mdd_obj(cobj);
835         struct lu_attr    *attr = &ma->ma_attr;
836         struct lov_mds_md *lmm = NULL;
837         int                lmm_size = 0;
838         struct thandle    *handle;
839         int                rc;
840         ENTRY;
841
842         rc = mdd_cd_sanity_check(env, son);
843         if (rc)
844                 RETURN(rc);
845
846         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
847                         !(spec->sp_cr_flags & FMODE_WRITE))
848                 RETURN(0);
849         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
850                             attr);
851         if (rc)
852                 RETURN(rc);
853
854         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
855         handle = mdd_trans_start(env, mdd);
856         if (IS_ERR(handle))
857                 RETURN(rc = PTR_ERR(handle));
858
859         /*
860          * XXX: Setting the lov ea is not locked but setting the attr is locked?
861          * Should this be fixed?
862          */
863
864         /* Replay creates has objects already */
865         if (spec->u.sp_ea.no_lov_create) {
866                 CDEBUG(D_INFO, "we already have lov ea\n");
867                 rc = mdd_lov_set_md(env, mdd_pobj, son,
868                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
869                                     spec->u.sp_ea.eadatalen, handle, 0);
870         } else
871                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
872                                     lmm_size, handle, 0);
873
874         if (rc == 0)
875                rc = mdd_attr_get_internal_locked(env, son, ma);
876
877         /* Finish mdd_lov_create() stuff. */
878         mdd_lov_create_finish(env, mdd, rc);
879         mdd_trans_stop(env, mdd, rc, handle);
880         if (lmm)
881                 OBD_FREE(lmm, lmm_size);
882         RETURN(rc);
883 }
884
885 static int
886 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
887              const char *name, struct lu_fid* fid, int mask)
888 {
889         const struct dt_key *key = (const struct dt_key *)name;
890         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
891         struct dt_object    *dir = mdd_object_child(mdd_obj);
892         struct dt_rec       *rec = (struct dt_rec *)fid;
893         struct timeval       start;
894         int rc;
895         ENTRY;
896
897         mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
898         if (mdd_is_dead_obj(mdd_obj))
899                 RETURN(-ENOENT);
900
901         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
902         if (rc == 0)
903                 RETURN(-ESTALE);
904         else if (rc < 0) {
905                 CERROR("Object "DFID" locates on remote server\n",
906                         PFID(mdo2fid(mdd_obj)));
907                 LBUG();
908         }
909
910         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
911         if (rc)
912                 RETURN(rc);
913
914         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
915                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
916                                                    mdd_object_capa(env, mdd_obj));
917                 if (rc == 0)
918                         fid_be_to_cpu(fid, fid);
919         } else
920                 rc = -ENOTDIR;
921
922         mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
923         RETURN(rc);
924 }
925
926 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
927                           struct mdd_object *child, struct md_attr *ma,
928                           struct thandle *handle)
929 {
930         int rc;
931         ENTRY;
932
933         /*
934          * Update attributes for child.
935          *
936          * FIXME:
937          *  (1) the valid bits should be converted between Lustre and Linux;
938          *  (2) maybe, the child attributes should be set in OSD when creation.
939          */
940
941         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
942         if (rc != 0)
943                 RETURN(rc);
944
945         if (S_ISDIR(ma->ma_attr.la_mode)) {
946                 /* Add "." and ".." for newly created dir */
947                 mdd_ref_add_internal(env, child, handle);
948                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
949                                              dot, handle, BYPASS_CAPA);
950                 if (rc == 0) {
951                         rc = __mdd_index_insert_only(env, child, pfid,
952                                                      dotdot, handle,
953                                                      BYPASS_CAPA);
954                         if (rc != 0) {
955                                 int rc2;
956
957                                 rc2 = __mdd_index_delete(env, child, dot, 0,
958                                                          handle, BYPASS_CAPA);
959                                 if (rc2 != 0)
960                                         CERROR("Failure to cleanup after dotdot"
961                                                " creation: %d (%d)\n", rc2, rc);
962                                 else
963                                         mdd_ref_del_internal(env, child, handle);
964                         }
965                 }
966         }
967         RETURN(rc);
968 }
969
970 static int mdd_create_sanity_check(const struct lu_env *env,
971                                    struct md_object *pobj,
972                                    const char *name,
973                                    struct md_attr *ma,
974                                    int lookup)
975 {
976         struct mdd_thread_info *info = mdd_env_info(env);
977         struct lu_attr    *la        = &info->mti_la;
978         struct lu_fid     *fid       = &info->mti_fid;
979         struct mdd_object *obj       = md2mdd_obj(pobj);
980         int rc;
981         ENTRY;
982
983         /* EEXIST check */
984         if (mdd_is_dead_obj(obj))
985                 RETURN(-ENOENT);
986
987         /*
988          * In some cases this lookup is not needed - we know before that if name
989          * exists or not.
990          */
991         if (lookup) {
992                 /*
993                  * Check if the name already exist, though it will be checked in
994                  * _index_insert also, for avoiding rolling back if exists
995                  * _index_insert.
996                  */
997                 rc = __mdd_lookup_locked(env, pobj, name, fid,
998                                          MAY_WRITE | MAY_EXEC);
999                 if (rc != -ENOENT)
1000                         RETURN(rc ? : -EEXIST);
1001         } else {
1002                 /*
1003                  * Check if has WRITE permission for the parent.
1004                  */
1005                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1006                 if (rc)
1007                         RETURN(rc);
1008         }
1009         
1010         /* sgid check */
1011         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1012         if (rc != 0)
1013                 RETURN(rc);
1014
1015         if (la->la_mode & S_ISGID) {
1016                 ma->ma_attr.la_gid = la->la_gid;
1017                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1018                         ma->ma_attr.la_mode |= S_ISGID;
1019                         ma->ma_attr.la_valid |= LA_MODE;
1020                 }
1021         }
1022
1023         switch (ma->ma_attr.la_mode & S_IFMT) {
1024         case S_IFREG:
1025         case S_IFDIR:
1026         case S_IFLNK:
1027         case S_IFCHR:
1028         case S_IFBLK:
1029         case S_IFIFO:
1030         case S_IFSOCK:
1031                 rc = 0;
1032                 break;
1033         default:
1034                 rc = -EINVAL;
1035                 break;
1036         }
1037         RETURN(rc);
1038 }
1039
1040 /*
1041  * Create object and insert it into namespace.
1042  */
1043 static int mdd_create(const struct lu_env *env,
1044                       struct md_object *pobj, const char *name,
1045                       struct md_object *child,
1046                       struct md_op_spec *spec,
1047                       struct md_attr* ma)
1048 {
1049         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1050         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1051         struct mdd_object *son = md2mdd_obj(child);
1052         struct mdd_device *mdd = mdo2mdd(pobj);
1053         struct lu_attr    *attr = &ma->ma_attr;
1054         struct lov_mds_md *lmm = NULL;
1055         struct thandle    *handle;
1056         int rc, created = 0, inserted = 0, lmm_size = 0;
1057         struct dynlock_handle *dlh;
1058         struct timeval  start;
1059         ENTRY;
1060
1061         mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1062
1063         /*
1064          * Two operations have to be performed:
1065          *
1066          *  - allocation of new object (->do_create()), and
1067          *
1068          *  - insertion into parent index (->dio_insert()).
1069          *
1070          * Due to locking, operation order is not important, when both are
1071          * successful, *but* error handling cases are quite different:
1072          *
1073          *  - if insertion is done first, and following object creation fails,
1074          *  insertion has to be rolled back, but this operation might fail
1075          *  also leaving us with dangling index entry.
1076          *
1077          *  - if creation is done first, is has to be undone if insertion
1078          *  fails, leaving us with leaked space, which is neither good, nor
1079          *  fatal.
1080          *
1081          * It seems that creation-first is simplest solution, but it is
1082          * sub-optimal in the frequent
1083          *
1084          *         $ mkdir foo
1085          *         $ mkdir foo
1086          *
1087          * case, because second mkdir is bound to create object, only to
1088          * destroy it immediately.
1089          *
1090          * To avoid this follow local file systems that do double lookup:
1091          *
1092          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1093          *
1094          *     1. create            (mdd_object_create_internal())
1095          *
1096          *     2. insert            (__mdd_index_insert(), lookup again)
1097          */
1098
1099         /* Sanity checks before big job. */
1100         rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1101         if (rc)
1102                 RETURN(rc);
1103
1104         /*
1105          * No RPC inside the transaction, so OST objects should be created at
1106          * first.
1107          */
1108         if (S_ISREG(attr->la_mode)) {
1109                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1110                                     spec, attr);
1111                 if (rc)
1112                         RETURN(rc);
1113         }
1114
1115         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1116         handle = mdd_trans_start(env, mdd);
1117         if (IS_ERR(handle))
1118                 GOTO(out_free, rc = PTR_ERR(handle));
1119
1120         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1121         if (dlh == NULL)
1122                 GOTO(out_trans, rc = -ENOMEM);
1123
1124         /*
1125          * XXX: Check that link can be added to the parent in mkdir case.
1126          */
1127
1128         mdd_write_lock(env, son);
1129         rc = mdd_object_create_internal(env, son, ma, handle);
1130         if (rc) {
1131                 mdd_write_unlock(env, son);
1132                 GOTO(cleanup, rc);
1133         }
1134
1135         created = 1;
1136
1137 #ifdef CONFIG_FS_POSIX_ACL
1138         mdd_read_lock(env, mdd_pobj);
1139         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1140         mdd_read_unlock(env, mdd_pobj);
1141         if (rc) {
1142                 mdd_write_unlock(env, son);
1143                 GOTO(cleanup, rc);
1144         } else {
1145                 ma->ma_attr.la_valid |= LA_MODE;
1146         }
1147 #endif
1148
1149         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1150                                    son, ma, handle);
1151         mdd_write_unlock(env, son);
1152         if (rc)
1153                 /*
1154                  * Object has no links, so it will be destroyed when last
1155                  * reference is released. (XXX not now.)
1156                  */
1157                 GOTO(cleanup, rc);
1158
1159         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1160                                 name, S_ISDIR(attr->la_mode), handle,
1161                                 mdd_object_capa(env, mdd_pobj));
1162
1163         if (rc)
1164                 GOTO(cleanup, rc);
1165
1166         inserted = 1;
1167
1168         /* Replay creates has objects already. */
1169         if (spec->u.sp_ea.no_lov_create) {
1170                 CDEBUG(D_INFO, "we already have lov ea\n");
1171                 LASSERT(lmm == NULL);
1172                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1173                 lmm_size = spec->u.sp_ea.eadatalen;
1174         }
1175         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1176         if (rc) {
1177                 CERROR("error on stripe info copy %d \n", rc);
1178                 GOTO(cleanup, rc);
1179         }
1180         if (lmm && lmm_size > 0) {
1181                 /* Set Lov here, do not get lmm again later */
1182                 memcpy(ma->ma_lmm, lmm, lmm_size);
1183                 ma->ma_lmm_size = lmm_size;
1184                 ma->ma_valid |= MA_LOV;
1185         }
1186
1187         if (S_ISLNK(attr->la_mode)) {
1188                 struct dt_object *dt = mdd_object_child(son);
1189                 const char *target_name = spec->u.sp_symname;
1190                 int sym_len = strlen(target_name);
1191                 const struct lu_buf *buf;
1192                 loff_t pos = 0;
1193
1194                 buf = mdd_buf_get_const(env, target_name, sym_len);
1195                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1196                                                 mdd_object_capa(env, son));
1197
1198                 if (rc == sym_len)
1199                         rc = 0;
1200                 else
1201                         GOTO(cleanup, rc = -EFAULT);
1202         }
1203
1204         *la = ma->ma_attr;
1205         la->la_valid = LA_CTIME | LA_MTIME;
1206         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1207         if (rc)
1208                 GOTO(cleanup, rc);
1209
1210         /* Return attr back. */
1211         rc = mdd_attr_get_internal_locked(env, son, ma);
1212         EXIT;
1213 cleanup:
1214         if (rc && created) {
1215                 int rc2 = 0;
1216
1217                 if (inserted) {
1218                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1219                                                  S_ISDIR(attr->la_mode),
1220                                                  handle, BYPASS_CAPA);
1221                         if (rc2)
1222                                 CERROR("error can not cleanup destroy %d\n",
1223                                        rc2);
1224                 }
1225                 if (rc2 == 0) {
1226                         mdd_write_lock(env, son);
1227                         mdd_ref_del_internal(env, son, handle);
1228                         mdd_write_unlock(env, son);
1229                 }
1230         }
1231
1232         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1233 out_trans:
1234         mdd_trans_stop(env, mdd, rc, handle);
1235 out_free:
1236         if (lmm && !spec->u.sp_ea.no_lov_create)
1237                 OBD_FREE(lmm, lmm_size);
1238         /* Finish mdd_lov_create() stuff */
1239         mdd_lov_create_finish(env, mdd, rc);
1240         mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1241         return rc;
1242 }
1243
1244 /*
1245  * Get locks on parents in proper order
1246  * RETURN: < 0 - error, rename_order if successful
1247  */
1248 enum rename_order {
1249         MDD_RN_SAME,
1250         MDD_RN_SRCTGT,
1251         MDD_RN_TGTSRC
1252 };
1253
1254 static int mdd_rename_order(const struct lu_env *env,
1255                             struct mdd_device *mdd,
1256                             struct mdd_object *src_pobj,
1257                             struct mdd_object *tgt_pobj)
1258 {
1259         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1260         int rc;
1261         ENTRY;
1262
1263         if (src_pobj == tgt_pobj)
1264                 RETURN(MDD_RN_SAME);
1265
1266         /* compared the parent child relationship of src_p&tgt_p */
1267         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1268                 rc = MDD_RN_SRCTGT;
1269         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1270                 rc = MDD_RN_TGTSRC;
1271         } else {
1272                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1273                 if (rc == -EREMOTE)
1274                         rc = 0;
1275
1276                 if (rc == 1)
1277                         rc = MDD_RN_TGTSRC;
1278                 else if (rc == 0)
1279                         rc = MDD_RN_SRCTGT;
1280         }
1281
1282         RETURN(rc);
1283 }
1284
1285 static int mdd_rename_sanity_check(const struct lu_env *env,
1286                                    struct mdd_object *src_pobj,
1287                                    struct mdd_object *tgt_pobj,
1288                                    const struct lu_fid *sfid,
1289                                    int src_is_dir,
1290                                    struct mdd_object *tobj)
1291 {
1292         int rc;
1293         ENTRY;
1294
1295         if (mdd_is_dead_obj(src_pobj))
1296                 RETURN(-ENOENT);
1297
1298         /* The sobj maybe on the remote, check parent permission only here */
1299         rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1300                                             MAY_WRITE | MAY_EXEC);
1301         if (rc)
1302                 RETURN(rc);
1303
1304         if (!tobj) {
1305                 rc = mdd_may_create(env, tgt_pobj, NULL,
1306                                     (src_pobj != tgt_pobj));
1307         } else {
1308                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1309                                     (src_pobj != tgt_pobj));
1310                 if (rc == 0)
1311                         if (S_ISDIR(mdd_object_type(tobj))
1312                             && mdd_dir_is_empty(env, tobj))
1313                                 rc = -ENOTEMPTY;
1314         }
1315
1316         RETURN(rc);
1317 }
1318 /* src object can be remote that is why we use only fid and type of object */
1319 static int mdd_rename(const struct lu_env *env,
1320                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1321                       const struct lu_fid *lf, const char *sname,
1322                       struct md_object *tobj, const char *tname,
1323                       struct md_attr *ma)
1324 {
1325         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1326         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1327         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1328         struct mdd_device *mdd = mdo2mdd(src_pobj);
1329         struct mdd_object *mdd_sobj = NULL;
1330         struct mdd_object *mdd_tobj = NULL;
1331         struct dynlock_handle *sdlh, *tdlh;
1332         struct thandle *handle;
1333         int is_dir;
1334         int rc;
1335         ENTRY;
1336
1337         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1338         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1339         if (ma->ma_attr.la_valid & LA_FLAGS &&
1340             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1341                 RETURN(-EPERM);
1342
1343         if (tobj)
1344                 mdd_tobj = md2mdd_obj(tobj);
1345
1346         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1347         handle = mdd_trans_start(env, mdd);
1348         if (IS_ERR(handle))
1349                 RETURN(PTR_ERR(handle));
1350
1351         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1352         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1353         if (rc < 0)
1354                 GOTO(cleanup_unlocked, rc);
1355
1356         /* Get locks in determined order */
1357         if (rc == MDD_RN_SAME) {
1358                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1359                 /* check hashes to determine do we need one lock or two */
1360                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1361                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1362                 else
1363                         tdlh = sdlh;
1364         } else if (rc == MDD_RN_SRCTGT) {
1365                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1366                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1367         } else {
1368                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1369                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1370         }
1371         if (sdlh == NULL || tdlh == NULL)
1372                 GOTO(cleanup, rc = -ENOMEM);
1373
1374         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1375                                      lf, is_dir, mdd_tobj);
1376         if (rc)
1377                 GOTO(cleanup, rc);
1378
1379         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1380                                 mdd_object_capa(env, mdd_spobj));
1381         if (rc)
1382                 GOTO(cleanup, rc);
1383
1384         /*
1385          * Here tobj can be remote one, so we do index_delete unconditionally
1386          * and -ENOENT is allowed.
1387          */
1388         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1389                                 mdd_object_capa(env, mdd_tpobj));
1390         if (rc != 0 && rc != -ENOENT)
1391                 GOTO(cleanup, rc);
1392
1393         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1394                                 mdd_object_capa(env, mdd_tpobj));
1395         if (rc)
1396                 GOTO(cleanup, rc);
1397
1398         *la = ma->ma_attr;
1399         mdd_sobj = mdd_object_find(env, mdd, lf);
1400         if (mdd_sobj) {
1401                 la->la_valid = LA_CTIME;
1402
1403                 /* XXX: How to update ctime for remote sobj? */
1404                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1405                 if (rc)
1406                         GOTO(cleanup, rc);
1407         }
1408         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1409                 mdd_write_lock(env, mdd_tobj);
1410                 mdd_ref_del_internal(env, mdd_tobj, handle);
1411
1412                 /* Remove dot reference. */
1413                 if (is_dir)
1414                         mdd_ref_del_internal(env, mdd_tobj, handle);
1415
1416                 la->la_valid = LA_CTIME;
1417                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1418                 if (rc)
1419                         GOTO(cleanup, rc);
1420
1421                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1422                 mdd_write_unlock(env, mdd_tobj);
1423                 if (rc)
1424                         GOTO(cleanup, rc);
1425         }
1426
1427         la->la_valid = LA_CTIME | LA_MTIME;
1428         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1429         if (rc)
1430                 GOTO(cleanup, rc);
1431
1432         if (mdd_spobj != mdd_tpobj) {
1433                 la->la_valid = LA_CTIME | LA_MTIME;
1434                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1435                                                   handle, 0);
1436         }
1437
1438         EXIT;
1439 cleanup:
1440         if (likely(tdlh) && sdlh != tdlh)
1441                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1442         if (likely(sdlh))
1443                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1444 cleanup_unlocked:
1445         mdd_trans_stop(env, mdd, rc, handle);
1446         if (mdd_sobj)
1447                 mdd_object_put(env, mdd_sobj);
1448         return rc;
1449 }
1450
1451 struct md_dir_operations mdd_dir_ops = {
1452         .mdo_is_subdir     = mdd_is_subdir,
1453         .mdo_lookup        = mdd_lookup,
1454         .mdo_create        = mdd_create,
1455         .mdo_rename        = mdd_rename,
1456         .mdo_link          = mdd_link,
1457         .mdo_unlink        = mdd_unlink,
1458         .mdo_name_insert   = mdd_name_insert,
1459         .mdo_name_remove   = mdd_name_remove,
1460         .mdo_rename_tgt    = mdd_rename_tgt,
1461         .mdo_create_data   = mdd_create_data
1462 };