Whamcloud - gitweb
fix memleak in error case.
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid, struct md_op_spec *spec)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check)
200                 rc = mdd_permission_internal_locked(env, pobj, NULL,
201                                                     MAY_WRITE | MAY_EXEC);
202
203         RETURN(rc);
204 }
205
206 static inline int mdd_is_sticky(const struct lu_env *env,
207                                 struct mdd_object *pobj,
208                                 struct mdd_object *cobj)
209 {
210         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211         struct md_ucred *uc = md_ucred(env);
212         int rc;
213
214         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
215         if (rc) {
216                 return rc;
217         } else if (tmp_la->la_uid == uc->mu_fsuid) {
218                 return 0;
219         } else {
220                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221                 if (rc)
222                         return rc;
223                 else if (!(tmp_la->la_mode & S_ISVTX) ||
224                          (tmp_la->la_uid == uc->mu_fsuid))
225                         return 0;
226                 else
227                         return !mdd_capable(uc, CAP_FOWNER);
228         }
229 }
230
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233                           struct mdd_object *pobj,
234                           struct mdd_object *cobj,
235                           int is_dir, int need_check)
236 {
237         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
238         int rc = 0;
239         ENTRY;
240
241         LASSERT(cobj);
242
243         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
244                 RETURN(-ENOENT);
245
246         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
247                 RETURN(-EPERM);
248
249         if (is_dir) {
250                 if (!S_ISDIR(mdd_object_type(cobj)))
251                         RETURN(-ENOTDIR);
252
253                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
254                         RETURN(-EBUSY);
255
256         } else if (S_ISDIR(mdd_object_type(cobj))) {
257                 RETURN(-EISDIR);
258         }
259
260         if (pobj) {
261                 if (mdd_is_dead_obj(pobj))
262                         RETURN(-ENOENT);
263
264                 if (mdd_is_sticky(env, pobj, cobj))
265                         RETURN(-EPERM);
266
267                 if (need_check)
268                         rc = mdd_permission_internal_locked(env, pobj, NULL,
269                                                      MAY_WRITE | MAY_EXEC);
270         }
271         RETURN(rc);
272 }
273
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275                           struct mdd_object *src_obj)
276 {
277         int rc = 0;
278         ENTRY;
279
280         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
281                 RETURN(-EPERM);
282
283         if (S_ISDIR(mdd_object_type(src_obj)))
284                 RETURN(-EPERM);
285
286         LASSERT(src_obj != tgt_obj);
287         if (tgt_obj) {
288                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
289                 if (rc)
290                         RETURN(rc);
291         }
292
293         RETURN(rc);
294 }
295
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297                                    const struct lu_fid *fid)
298 {
299         struct mdd_thread_info *info = mdd_env_info(env);
300
301         fid_cpu_to_be(&info->mti_fid2, fid);
302         return (const struct dt_rec *)&info->mti_fid2;
303 }
304
305
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308                               const struct lu_fid *lf, const char *name, int is_dir,
309                               struct thandle *handle, struct lustre_capa *capa)
310 {
311         struct dt_object *next = mdd_object_child(pobj);
312         struct timeval    start;
313         int               rc;
314         ENTRY;
315
316         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317                                LPROC_MDD_INDEX_INSERT);
318         if (dt_try_as_dir(env, next)) {
319                 rc = next->do_index_ops->dio_insert(env, next,
320                                                     __mdd_fid_rec(env, lf),
321                                                     (const struct dt_key *)name,
322                                                     handle, capa);
323         } else {
324                 rc = -ENOTDIR;
325         }
326
327         if (rc == 0) {
328                 if (is_dir) {
329                         mdd_write_lock(env, pobj);
330                         mdd_ref_add_internal(env, pobj, handle);
331                         mdd_write_unlock(env, pobj);
332                 }
333         }
334         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335                              LPROC_MDD_INDEX_INSERT);
336         RETURN(rc);
337 }
338
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340                               const char *name, int is_dir, struct thandle *handle,
341                               struct lustre_capa *capa)
342 {
343         struct dt_object *next = mdd_object_child(pobj);
344         struct timeval    start;
345         int               rc;
346         ENTRY;
347
348         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349                                LPROC_MDD_INDEX_DELETE);
350
351         if (dt_try_as_dir(env, next)) {
352                 rc = next->do_index_ops->dio_delete(env, next,
353                                                     (struct dt_key *)name,
354                                                     handle, capa);
355                 if (rc == 0 && is_dir) {
356                         mdd_write_lock(env, pobj);
357                         mdd_ref_del_internal(env, pobj, handle);
358                         mdd_write_unlock(env, pobj);
359                 }
360         } else
361                 rc = -ENOTDIR;
362
363         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364                              LPROC_MDD_INDEX_DELETE);
365         RETURN(rc);
366 }
367
368 static int
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370                         const struct lu_fid *lf, const char *name,
371                         struct thandle *handle, struct lustre_capa *capa)
372 {
373         struct dt_object *next = mdd_object_child(pobj);
374         int               rc;
375         ENTRY;
376
377         if (dt_try_as_dir(env, next)) {
378                 rc = next->do_index_ops->dio_insert(env, next,
379                                                     __mdd_fid_rec(env, lf),
380                                                     (const struct dt_key *)name,
381                                                     handle, capa);
382         } else {
383                 rc = -ENOTDIR;
384         }
385         RETURN(rc);
386 }
387
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389                     struct md_object *src_obj, const char *name,
390                     struct md_attr *ma)
391 {
392         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
393         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395         struct mdd_device *mdd = mdo2mdd(src_obj);
396         struct dynlock_handle *dlh;
397         struct thandle *handle;
398         int rc;
399         ENTRY;
400
401         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402         handle = mdd_trans_start(env, mdd);
403         if (IS_ERR(handle))
404                 RETURN(PTR_ERR(handle));
405
406         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
407         if (dlh == NULL)
408                 GOTO(out_trans, rc = -ENOMEM);
409         mdd_write_lock(env, mdd_sobj);
410
411         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
412         if (rc)
413                 GOTO(out_unlock, rc);
414
415         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
416                                      name, handle,
417                                      mdd_object_capa(env, mdd_tobj));
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         mdd_ref_add_internal(env, mdd_sobj, handle);
422
423         *la = ma->ma_attr;
424         la->la_valid = LA_CTIME | LA_MTIME;
425         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
426         if (rc)
427                 GOTO(out_unlock, rc);
428
429         la->la_valid = LA_CTIME;
430         rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
431         EXIT;
432 out_unlock:
433         mdd_write_unlock(env, mdd_sobj);
434         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
435 out_trans:
436         mdd_trans_stop(env, mdd, rc, handle);
437         return rc;
438 }
439
440 /* caller should take a lock before calling */
441 int mdd_finish_unlink(const struct lu_env *env,
442                       struct mdd_object *obj, struct md_attr *ma,
443                       struct thandle *th)
444 {
445         int rc;
446         ENTRY;
447
448         rc = mdd_iattr_get(env, obj, ma);
449         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
450                 /* add new orphan and the object
451                  * will be deleted during the object_put() */
452                 if (__mdd_orphan_add(env, obj, th) == 0)
453                         obj->mod_flags |= ORPHAN_OBJ;
454
455                 if (obj->mod_count == 0)
456                         rc = mdd_object_kill(env, obj, ma);
457                 else
458                         /* clear MA_LOV | MA_COOKIE, if we do not
459                          * unlink it in case we get it somewhere */
460                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
461         } else
462                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
463
464         RETURN(rc);
465 }
466
467 /*
468  * Check that @dir contains no entries except (possibly) dot and dotdot.
469  *
470  * Returns:
471  *
472  *             0        empty
473  *    -ENOTEMPTY        not empty
474  *           -ve        other error
475  *
476  */
477 static int mdd_dir_is_empty(const struct lu_env *env,
478                             struct mdd_object *dir)
479 {
480         struct dt_it     *it;
481         struct dt_object *obj;
482         struct dt_it_ops *iops;
483         int result;
484         ENTRY;
485
486         obj = mdd_object_child(dir);
487         iops = &obj->do_index_ops->dio_it;
488         it = iops->init(env, obj, 0, BYPASS_CAPA);
489         if (it != NULL) {
490                 result = iops->get(env, it, (const void *)"");
491                 if (result > 0) {
492                         int i;
493                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
494                                 result = iops->next(env, it);
495                         if (result == 0)
496                                 result = -ENOTEMPTY;
497                         else if (result == +1)
498                                 result = 0;
499                 } else if (result == 0)
500                         /*
501                          * Huh? Index contains no zero key?
502                          */
503                         result = -EIO;
504
505                 iops->put(env, it);
506                 iops->fini(env, it);
507         } else
508                 result = -ENOMEM;
509         RETURN(result);
510 }
511
512 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
513                             struct mdd_object *cobj, struct md_attr *ma)
514 {
515         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
516         int rc = 0;
517         ENTRY;
518
519         rc = mdd_may_delete(env, pobj, cobj,
520                             S_ISDIR(ma->ma_attr.la_mode), 1);
521         if (rc)
522                 RETURN(rc);
523
524         if (S_ISDIR(mdd_object_type(cobj))) {
525                 if (dt_try_as_dir(env, dt_cobj))
526                         rc = mdd_dir_is_empty(env, cobj);
527                 else
528                         rc = -ENOTDIR;
529         }
530
531         RETURN(rc);
532 }
533
534 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
535                       struct md_object *cobj, const char *name,
536                       struct md_attr *ma)
537 {
538         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
539         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
540         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
541         struct mdd_device *mdd = mdo2mdd(pobj);
542         struct dynlock_handle *dlh;
543         struct thandle    *handle;
544         int rc, is_dir;
545         ENTRY;
546
547         /*
548          * Check -ENOENT early here because we need to get object type
549          * to calculate credits before transaction start
550          */
551         if (!lu_object_exists(&cobj->mo_lu))
552                 RETURN(-ENOENT);
553         LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is"DFID"\n",
554                  PFID(lu_object_fid(&cobj->mo_lu)));
555         
556         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
557         if (rc)
558                 RETURN(rc);
559
560         handle = mdd_trans_start(env, mdd);
561         if (IS_ERR(handle))
562                 RETURN(PTR_ERR(handle));
563
564         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
565         if (dlh == NULL)
566                 GOTO(out_trans, rc = -ENOMEM);
567         mdd_write_lock(env, mdd_cobj);
568
569         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
570         if (rc)
571                 GOTO(cleanup, rc);
572
573         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
574         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
575                                 mdd_object_capa(env, mdd_pobj));
576         if (rc)
577                 GOTO(cleanup, rc);
578
579         mdd_ref_del_internal(env, mdd_cobj, handle);
580         if (is_dir) {
581                 /* unlink dot */
582                 mdd_ref_del_internal(env, mdd_cobj, handle);
583         }
584
585         *la = ma->ma_attr;
586         la->la_valid = LA_CTIME | LA_MTIME;
587         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
588         if (rc)
589                 GOTO(cleanup, rc);
590
591         la->la_valid = LA_CTIME;
592         rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
593         if (rc)
594                 GOTO(cleanup, rc);
595
596         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
597
598         if (rc == 0)
599                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
600                                    strlen("unlinked"), "unlinked", 0,
601                                    NULL, NULL);
602         EXIT;
603 cleanup:
604         mdd_write_unlock(env, mdd_cobj);
605         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
606 out_trans:
607         mdd_trans_stop(env, mdd, rc, handle);
608         return rc;
609 }
610
611 static int mdd_ni_sanity_check(const struct lu_env *env,
612                                struct md_object *pobj,
613                                const char *name,
614                                const struct lu_fid *fid)
615 {
616         struct mdd_object *obj = md2mdd_obj(pobj);
617         ENTRY;
618
619         /* EEXIST check */
620         if (mdd_is_dead_obj(obj))
621                 RETURN(-ENOENT);
622
623         /* The exist of the name will be checked in _index_insert. */
624         RETURN(mdd_permission_internal_locked(env, obj, NULL,
625                                               MAY_WRITE | MAY_EXEC));
626 }
627
628 /*
629  * Partial operation.
630  */
631 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
632                            const char *name, const struct lu_fid *fid,
633                            int is_dir)
634 {
635         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
636         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
637         struct mdd_device *mdd = mdo2mdd(pobj);
638         struct dynlock_handle *dlh;
639         struct thandle *handle;
640         int rc;
641         ENTRY;
642
643         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
644         handle = mdd_trans_start(env, mdo2mdd(pobj));
645         if (IS_ERR(handle))
646                 RETURN(PTR_ERR(handle));
647
648         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
649         if (dlh == NULL)
650                 GOTO(out_trans, rc = -ENOMEM);
651         rc = mdd_ni_sanity_check(env, pobj, name, fid);
652         if (rc)
653                 GOTO(out_unlock, rc);
654
655         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
656                                 handle, BYPASS_CAPA);
657
658         la->la_ctime = la->la_atime = CURRENT_SECONDS;
659         la->la_valid = LA_ATIME | LA_CTIME;
660         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
661         EXIT;
662 out_unlock:
663         mdd_pdo_write_unlock(env, mdd_obj, dlh);
664 out_trans:
665         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
666         return rc;
667 }
668
669 static int mdd_nr_sanity_check(const struct lu_env *env,
670                                struct md_object *pobj,
671                                const char *name)
672 {
673         struct mdd_object *obj = md2mdd_obj(pobj);
674         ENTRY;
675
676         /* EEXIST check */
677         if (mdd_is_dead_obj(obj)) {
678                 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
679                 RETURN(-ENOENT);
680         }
681
682         /* Name presense will be checked in _index_delete. */
683         RETURN(mdd_permission_internal_locked(env, obj, NULL,
684                                               MAY_WRITE | MAY_EXEC));
685 }
686
687 /*
688  * Partial operation.
689  */
690 static int mdd_name_remove(const struct lu_env *env,
691                            struct md_object *pobj,
692                            const char *name, int is_dir)
693 {
694         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
695         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
696         struct mdd_device *mdd = mdo2mdd(pobj);
697         struct dynlock_handle *dlh;
698         struct thandle *handle;
699         int rc;
700         ENTRY;
701
702         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
703         handle = mdd_trans_start(env, mdd);
704         if (IS_ERR(handle))
705                 RETURN(PTR_ERR(handle));
706
707         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
708         if (dlh == NULL)
709                 GOTO(out_trans, rc = -ENOMEM);
710         rc = mdd_nr_sanity_check(env, pobj, name);
711         if (rc)
712                 GOTO(out_unlock, rc);
713
714         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
715                                 handle, BYPASS_CAPA);
716         if (rc)
717                 GOTO(out_unlock, rc);
718
719         la->la_ctime = la->la_mtime = CURRENT_SECONDS;
720         la->la_valid = LA_CTIME | LA_MTIME;
721         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
722         EXIT;
723 out_unlock:
724         mdd_pdo_write_unlock(env, mdd_obj, dlh);
725 out_trans:
726         mdd_trans_stop(env, mdd, rc, handle);
727         return rc;
728 }
729
730 static int mdd_rt_sanity_check(const struct lu_env *env,
731                                struct mdd_object *tgt_pobj,
732                                struct mdd_object *tobj,
733                                const struct lu_fid *sfid,
734                                const char *name, struct md_attr *ma)
735 {
736         int rc, src_is_dir;
737         ENTRY;
738
739         /* EEXIST check */
740         if (mdd_is_dead_obj(tgt_pobj))
741                 RETURN(-ENOENT);
742
743         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
744         if (tobj) {
745                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
746                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
747                      mdd_dir_is_empty(env, tobj))
748                                 RETURN(-ENOTEMPTY);
749         } else {
750                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
751         }
752
753         RETURN(rc);
754 }
755
756 static int mdd_rename_tgt(const struct lu_env *env,
757                           struct md_object *pobj, struct md_object *tobj,
758                           const struct lu_fid *lf, const char *name,
759                           struct md_attr *ma)
760 {
761         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
762         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
763         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
764         struct mdd_device *mdd = mdo2mdd(pobj);
765         struct dynlock_handle *dlh;
766         struct thandle *handle;
767         int rc;
768         ENTRY;
769
770         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
771         handle = mdd_trans_start(env, mdd);
772         if (IS_ERR(handle))
773                 RETURN(PTR_ERR(handle));
774
775         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
776         if (dlh == NULL)
777                 GOTO(out_trans, rc = -ENOMEM);
778         if (mdd_tobj)
779                 mdd_write_lock(env, mdd_tobj);
780
781         /* XXX: Rename sanity checking. */
782         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
783         if (rc)
784                 GOTO(cleanup, rc);
785
786         /*
787          * If rename_tgt is called then we should just re-insert name with
788          * correct fid, no need to dec/inc parent nlink if obj is dir.
789          */
790         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
791         if (rc)
792                 GOTO(cleanup, rc);
793
794         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
795                                      BYPASS_CAPA);
796         if (rc)
797                 GOTO(cleanup, rc);
798
799         *la = ma->ma_attr;
800         la->la_valid = LA_CTIME | LA_MTIME;
801         rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
802         if (rc)
803                 GOTO(cleanup, rc);
804
805         if (tobj && lu_object_exists(&tobj->mo_lu)) {
806                 mdd_ref_del_internal(env, mdd_tobj, handle);
807                 la->la_valid = LA_CTIME;
808                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
809         }
810         EXIT;
811 cleanup:
812         if (tobj)
813                 mdd_write_unlock(env, mdd_tobj);
814         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
815 out_trans:
816         mdd_trans_stop(env, mdd, rc, handle);
817         return rc;
818 }
819
820 /*
821  * The permission has been checked when obj created, no need check again.
822  */
823 static int mdd_cd_sanity_check(const struct lu_env *env,
824                                struct mdd_object *obj)
825 {
826         ENTRY;
827
828         /* EEXIST check */
829         if (!obj || mdd_is_dead_obj(obj))
830                 RETURN(-ENOENT);
831
832         RETURN(0);
833
834 }
835
836 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
837                            struct md_object *cobj, const struct md_op_spec *spec,
838                            struct md_attr *ma)
839 {
840         struct mdd_device *mdd = mdo2mdd(cobj);
841         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
842         struct mdd_object *son = md2mdd_obj(cobj);
843         struct lu_attr    *attr = &ma->ma_attr;
844         struct lov_mds_md *lmm = NULL;
845         int                lmm_size = 0;
846         struct thandle    *handle;
847         int                rc;
848         ENTRY;
849
850         rc = mdd_cd_sanity_check(env, son);
851         if (rc)
852                 RETURN(rc);
853
854         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
855             !(spec->sp_cr_flags & FMODE_WRITE))
856                 RETURN(0);
857         
858         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
859                             spec, attr);
860         if (rc)
861                 RETURN(rc);
862
863         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
864         handle = mdd_trans_start(env, mdd);
865         if (IS_ERR(handle))
866                 GOTO(out_free, rc = PTR_ERR(handle));
867
868         /*
869          * XXX: Setting the lov ea is not locked but setting the attr is locked?
870          * Should this be fixed?
871          */
872
873         /* Replay creates has objects already */
874         if (spec->u.sp_ea.no_lov_create) {
875                 CDEBUG(D_INFO, "we already have lov ea\n");
876                 rc = mdd_lov_set_md(env, mdd_pobj, son,
877                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
878                                     spec->u.sp_ea.eadatalen, handle, 0);
879         } else
880                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
881                                     lmm_size, handle, 0);
882
883         if (rc == 0)
884                rc = mdd_attr_get_internal_locked(env, son, ma);
885
886         mdd_trans_stop(env, mdd, rc, handle);
887 out_free:
888         /* Finish mdd_lov_create() stuff. */
889         mdd_lov_create_finish(env, mdd, rc);
890         if (lmm)
891                 OBD_FREE(lmm, lmm_size);
892         RETURN(rc);
893 }
894
895 static int
896 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
897              const char *name, struct lu_fid* fid, int mask)
898 {
899         const struct dt_key *key = (const struct dt_key *)name;
900         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
901         struct dt_object    *dir = mdd_object_child(mdd_obj);
902         struct dt_rec       *rec = (struct dt_rec *)fid;
903         struct timeval       start;
904         int rc;
905         ENTRY;
906
907         mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
908         if (mdd_is_dead_obj(mdd_obj))
909                 RETURN(-ENOENT);
910
911         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
912         if (rc == 0)
913                 RETURN(-ESTALE);
914         else if (rc < 0) {
915                 CERROR("Object "DFID" locates on remote server\n",
916                         PFID(mdo2fid(mdd_obj)));
917                 LBUG();
918         }
919
920         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
921         if (rc)
922                 RETURN(rc);
923
924         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
925                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
926                                                    mdd_object_capa(env, mdd_obj));
927                 if (rc == 0)
928                         fid_be_to_cpu(fid, fid);
929         } else
930                 rc = -ENOTDIR;
931
932         mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
933         RETURN(rc);
934 }
935
936 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
937                           struct mdd_object *child, struct md_attr *ma,
938                           struct thandle *handle)
939 {
940         int rc;
941         ENTRY;
942
943         /*
944          * Update attributes for child.
945          *
946          * FIXME:
947          *  (1) the valid bits should be converted between Lustre and Linux;
948          *  (2) maybe, the child attributes should be set in OSD when creation.
949          */
950
951         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
952         if (rc != 0)
953                 RETURN(rc);
954
955         if (S_ISDIR(ma->ma_attr.la_mode)) {
956                 /* Add "." and ".." for newly created dir */
957                 mdd_ref_add_internal(env, child, handle);
958                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
959                                              dot, handle, BYPASS_CAPA);
960                 if (rc == 0) {
961                         rc = __mdd_index_insert_only(env, child, pfid,
962                                                      dotdot, handle,
963                                                      BYPASS_CAPA);
964                         if (rc != 0) {
965                                 int rc2;
966
967                                 rc2 = __mdd_index_delete(env, child, dot, 0,
968                                                          handle, BYPASS_CAPA);
969                                 if (rc2 != 0)
970                                         CERROR("Failure to cleanup after dotdot"
971                                                " creation: %d (%d)\n", rc2, rc);
972                                 else
973                                         mdd_ref_del_internal(env, child, handle);
974                         }
975                 }
976         }
977         RETURN(rc);
978 }
979
980 static int mdd_create_sanity_check(const struct lu_env *env,
981                                    struct md_object *pobj,
982                                    const char *name,
983                                    struct md_attr *ma,
984                                    int lookup)
985 {
986         struct mdd_thread_info *info = mdd_env_info(env);
987         struct lu_attr    *la        = &info->mti_la;
988         struct lu_fid     *fid       = &info->mti_fid;
989         struct mdd_object *obj       = md2mdd_obj(pobj);
990         int rc;
991         ENTRY;
992
993         /* EEXIST check */
994         if (mdd_is_dead_obj(obj))
995                 RETURN(-ENOENT);
996
997         /*
998          * In some cases this lookup is not needed - we know before that if name
999          * exists or not.
1000          */
1001         /* XXX disable that lookup temporary */
1002         if (0 && lookup) {
1003                 /*
1004                  * Check if the name already exist, though it will be checked in
1005                  * _index_insert also, for avoiding rolling back if exists
1006                  * _index_insert.
1007                  */
1008                 rc = __mdd_lookup_locked(env, pobj, name, fid,
1009                                          MAY_WRITE | MAY_EXEC);
1010                 if (rc != -ENOENT)
1011                         RETURN(rc ? : -EEXIST);
1012         } else {
1013                 /*
1014                  * Check if has WRITE permission for the parent.
1015                  */
1016                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1017                 if (rc)
1018                         RETURN(rc);
1019         }
1020         
1021         /* sgid check */
1022         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1023         if (rc != 0)
1024                 RETURN(rc);
1025
1026         if (la->la_mode & S_ISGID) {
1027                 ma->ma_attr.la_gid = la->la_gid;
1028                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1029                         ma->ma_attr.la_mode |= S_ISGID;
1030                         ma->ma_attr.la_valid |= LA_MODE;
1031                 }
1032         }
1033
1034         switch (ma->ma_attr.la_mode & S_IFMT) {
1035         case S_IFREG:
1036         case S_IFDIR:
1037         case S_IFLNK:
1038         case S_IFCHR:
1039         case S_IFBLK:
1040         case S_IFIFO:
1041         case S_IFSOCK:
1042                 rc = 0;
1043                 break;
1044         default:
1045                 rc = -EINVAL;
1046                 break;
1047         }
1048         RETURN(rc);
1049 }
1050
1051 /*
1052  * Create object and insert it into namespace.
1053  */
1054 static int mdd_create(const struct lu_env *env,
1055                       struct md_object *pobj, const char *name,
1056                       struct md_object *child,
1057                       struct md_op_spec *spec,
1058                       struct md_attr* ma)
1059 {
1060         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1061         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1062         struct mdd_object *son = md2mdd_obj(child);
1063         struct mdd_device *mdd = mdo2mdd(pobj);
1064         struct lu_attr    *attr = &ma->ma_attr;
1065         struct lov_mds_md *lmm = NULL;
1066         struct thandle    *handle;
1067         int rc, created = 0, inserted = 0, lmm_size = 0;
1068         struct dynlock_handle *dlh;
1069         struct timeval  start;
1070         ENTRY;
1071
1072         mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1073
1074         /*
1075          * Two operations have to be performed:
1076          *
1077          *  - allocation of new object (->do_create()), and
1078          *
1079          *  - insertion into parent index (->dio_insert()).
1080          *
1081          * Due to locking, operation order is not important, when both are
1082          * successful, *but* error handling cases are quite different:
1083          *
1084          *  - if insertion is done first, and following object creation fails,
1085          *  insertion has to be rolled back, but this operation might fail
1086          *  also leaving us with dangling index entry.
1087          *
1088          *  - if creation is done first, is has to be undone if insertion
1089          *  fails, leaving us with leaked space, which is neither good, nor
1090          *  fatal.
1091          *
1092          * It seems that creation-first is simplest solution, but it is
1093          * sub-optimal in the frequent
1094          *
1095          *         $ mkdir foo
1096          *         $ mkdir foo
1097          *
1098          * case, because second mkdir is bound to create object, only to
1099          * destroy it immediately.
1100          *
1101          * To avoid this follow local file systems that do double lookup:
1102          *
1103          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1104          *
1105          *     1. create            (mdd_object_create_internal())
1106          *
1107          *     2. insert            (__mdd_index_insert(), lookup again)
1108          */
1109
1110         /* Sanity checks before big job. */
1111         rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1112         if (rc)
1113                 RETURN(rc);
1114         
1115         /*
1116          * No RPC inside the transaction, so OST objects should be created at
1117          * first.
1118          */
1119         if (S_ISREG(attr->la_mode)) {
1120                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1121                                     spec, attr);
1122                 if (rc)
1123                         RETURN(rc);
1124         }
1125
1126         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1127         handle = mdd_trans_start(env, mdd);
1128         if (IS_ERR(handle))
1129                 GOTO(out_free, rc = PTR_ERR(handle));
1130
1131         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1132         if (dlh == NULL)
1133                 GOTO(out_trans, rc = -ENOMEM);
1134
1135         /*
1136          * XXX: Check that link can be added to the parent in mkdir case.
1137          */
1138
1139         mdd_write_lock(env, son);
1140         rc = mdd_object_create_internal(env, son, ma, handle);
1141         if (rc) {
1142                 mdd_write_unlock(env, son);
1143                 GOTO(cleanup, rc);
1144         }
1145
1146         created = 1;
1147
1148 #ifdef CONFIG_FS_POSIX_ACL
1149         mdd_read_lock(env, mdd_pobj);
1150         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1151         mdd_read_unlock(env, mdd_pobj);
1152         if (rc) {
1153                 mdd_write_unlock(env, son);
1154                 GOTO(cleanup, rc);
1155         } else {
1156                 ma->ma_attr.la_valid |= LA_MODE;
1157         }
1158 #endif
1159
1160         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1161                                    son, ma, handle);
1162         mdd_write_unlock(env, son);
1163         if (rc)
1164                 /*
1165                  * Object has no links, so it will be destroyed when last
1166                  * reference is released. (XXX not now.)
1167                  */
1168                 GOTO(cleanup, rc);
1169
1170         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1171                                 name, S_ISDIR(attr->la_mode), handle,
1172                                 mdd_object_capa(env, mdd_pobj));
1173
1174         if (rc)
1175                 GOTO(cleanup, rc);
1176
1177         inserted = 1;
1178
1179         /* Replay creates has objects already. */
1180         if (spec->u.sp_ea.no_lov_create) {
1181                 CDEBUG(D_INFO, "we already have lov ea\n");
1182                 LASSERT(lmm == NULL);
1183                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1184                 lmm_size = spec->u.sp_ea.eadatalen;
1185         }
1186         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1187         if (rc) {
1188                 CERROR("error on stripe info copy %d \n", rc);
1189                 GOTO(cleanup, rc);
1190         }
1191         if (lmm && lmm_size > 0) {
1192                 /* Set Lov here, do not get lmm again later */
1193                 memcpy(ma->ma_lmm, lmm, lmm_size);
1194                 ma->ma_lmm_size = lmm_size;
1195                 ma->ma_valid |= MA_LOV;
1196         }
1197
1198         if (S_ISLNK(attr->la_mode)) {
1199                 struct dt_object *dt = mdd_object_child(son);
1200                 const char *target_name = spec->u.sp_symname;
1201                 int sym_len = strlen(target_name);
1202                 const struct lu_buf *buf;
1203                 loff_t pos = 0;
1204
1205                 buf = mdd_buf_get_const(env, target_name, sym_len);
1206                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1207                                                 mdd_object_capa(env, son));
1208
1209                 if (rc == sym_len)
1210                         rc = 0;
1211                 else
1212                         GOTO(cleanup, rc = -EFAULT);
1213         }
1214
1215         *la = ma->ma_attr;
1216         la->la_valid = LA_CTIME | LA_MTIME;
1217         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1218         if (rc)
1219                 GOTO(cleanup, rc);
1220
1221         /* Return attr back. */
1222         rc = mdd_attr_get_internal_locked(env, son, ma);
1223         EXIT;
1224 cleanup:
1225         if (rc && created) {
1226                 int rc2 = 0;
1227
1228                 if (inserted) {
1229                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1230                                                  S_ISDIR(attr->la_mode),
1231                                                  handle, BYPASS_CAPA);
1232                         if (rc2)
1233                                 CERROR("error can not cleanup destroy %d\n",
1234                                        rc2);
1235                 }
1236                 if (rc2 == 0) {
1237                         mdd_write_lock(env, son);
1238                         mdd_ref_del_internal(env, son, handle);
1239                         mdd_write_unlock(env, son);
1240                 }
1241         }
1242
1243         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1244 out_trans:
1245         mdd_trans_stop(env, mdd, rc, handle);
1246 out_free:
1247         if (lmm && !spec->u.sp_ea.no_lov_create)
1248                 OBD_FREE(lmm, lmm_size);
1249         /* Finish mdd_lov_create() stuff */
1250         mdd_lov_create_finish(env, mdd, rc);
1251         mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1252         return rc;
1253 }
1254
1255 /*
1256  * Get locks on parents in proper order
1257  * RETURN: < 0 - error, rename_order if successful
1258  */
1259 enum rename_order {
1260         MDD_RN_SAME,
1261         MDD_RN_SRCTGT,
1262         MDD_RN_TGTSRC
1263 };
1264
1265 static int mdd_rename_order(const struct lu_env *env,
1266                             struct mdd_device *mdd,
1267                             struct mdd_object *src_pobj,
1268                             struct mdd_object *tgt_pobj)
1269 {
1270         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1271         int rc;
1272         ENTRY;
1273
1274         if (src_pobj == tgt_pobj)
1275                 RETURN(MDD_RN_SAME);
1276
1277         /* compared the parent child relationship of src_p&tgt_p */
1278         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1279                 rc = MDD_RN_SRCTGT;
1280         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1281                 rc = MDD_RN_TGTSRC;
1282         } else {
1283                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1284                 if (rc == -EREMOTE)
1285                         rc = 0;
1286
1287                 if (rc == 1)
1288                         rc = MDD_RN_TGTSRC;
1289                 else if (rc == 0)
1290                         rc = MDD_RN_SRCTGT;
1291         }
1292
1293         RETURN(rc);
1294 }
1295
1296 static int mdd_rename_sanity_check(const struct lu_env *env,
1297                                    struct mdd_object *src_pobj,
1298                                    struct mdd_object *tgt_pobj,
1299                                    const struct lu_fid *sfid,
1300                                    int src_is_dir,
1301                                    struct mdd_object *tobj)
1302 {
1303         int rc;
1304         ENTRY;
1305
1306         if (mdd_is_dead_obj(src_pobj))
1307                 RETURN(-ENOENT);
1308
1309         /* The sobj maybe on the remote, check parent permission only here */
1310         rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1311                                             MAY_WRITE | MAY_EXEC);
1312         if (rc)
1313                 RETURN(rc);
1314
1315         if (!tobj) {
1316                 rc = mdd_may_create(env, tgt_pobj, NULL,
1317                                     (src_pobj != tgt_pobj));
1318         } else {
1319                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1320                                     (src_pobj != tgt_pobj));
1321                 if (rc == 0)
1322                         if (S_ISDIR(mdd_object_type(tobj))
1323                             && mdd_dir_is_empty(env, tobj))
1324                                 rc = -ENOTEMPTY;
1325         }
1326
1327         RETURN(rc);
1328 }
1329 /* src object can be remote that is why we use only fid and type of object */
1330 static int mdd_rename(const struct lu_env *env,
1331                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1332                       const struct lu_fid *lf, const char *sname,
1333                       struct md_object *tobj, const char *tname,
1334                       struct md_attr *ma)
1335 {
1336         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1337         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1338         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1339         struct mdd_device *mdd = mdo2mdd(src_pobj);
1340         struct mdd_object *mdd_sobj = NULL;
1341         struct mdd_object *mdd_tobj = NULL;
1342         struct dynlock_handle *sdlh, *tdlh;
1343         struct thandle *handle;
1344         int is_dir;
1345         int rc;
1346         ENTRY;
1347
1348         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1349         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1350         if (ma->ma_attr.la_valid & LA_FLAGS &&
1351             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1352                 RETURN(-EPERM);
1353
1354         if (tobj)
1355                 mdd_tobj = md2mdd_obj(tobj);
1356
1357         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1358         handle = mdd_trans_start(env, mdd);
1359         if (IS_ERR(handle))
1360                 RETURN(PTR_ERR(handle));
1361
1362         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1363         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1364         if (rc < 0)
1365                 GOTO(cleanup_unlocked, rc);
1366
1367         /* Get locks in determined order */
1368         if (rc == MDD_RN_SAME) {
1369                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1370                 /* check hashes to determine do we need one lock or two */
1371                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1372                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1373                 else
1374                         tdlh = sdlh;
1375         } else if (rc == MDD_RN_SRCTGT) {
1376                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1377                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1378         } else {
1379                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1380                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1381         }
1382         if (sdlh == NULL || tdlh == NULL)
1383                 GOTO(cleanup, rc = -ENOMEM);
1384
1385         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1386                                      lf, is_dir, mdd_tobj);
1387         if (rc)
1388                 GOTO(cleanup, rc);
1389
1390         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1391                                 mdd_object_capa(env, mdd_spobj));
1392         if (rc)
1393                 GOTO(cleanup, rc);
1394
1395         /*
1396          * Here tobj can be remote one, so we do index_delete unconditionally
1397          * and -ENOENT is allowed.
1398          */
1399         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1400                                 mdd_object_capa(env, mdd_tpobj));
1401         if (rc != 0 && rc != -ENOENT)
1402                 GOTO(cleanup, rc);
1403
1404         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1405                                 mdd_object_capa(env, mdd_tpobj));
1406         if (rc)
1407                 GOTO(cleanup, rc);
1408
1409         *la = ma->ma_attr;
1410         mdd_sobj = mdd_object_find(env, mdd, lf);
1411         if (mdd_sobj) {
1412                 la->la_valid = LA_CTIME;
1413
1414                 /* XXX: How to update ctime for remote sobj? */
1415                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1416                 if (rc)
1417                         GOTO(cleanup, rc);
1418         }
1419         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1420                 mdd_write_lock(env, mdd_tobj);
1421                 mdd_ref_del_internal(env, mdd_tobj, handle);
1422
1423                 /* Remove dot reference. */
1424                 if (is_dir)
1425                         mdd_ref_del_internal(env, mdd_tobj, handle);
1426
1427                 la->la_valid = LA_CTIME;
1428                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1429                 if (rc)
1430                         GOTO(cleanup, rc);
1431
1432                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1433                 mdd_write_unlock(env, mdd_tobj);
1434                 if (rc)
1435                         GOTO(cleanup, rc);
1436         }
1437
1438         la->la_valid = LA_CTIME | LA_MTIME;
1439         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1440         if (rc)
1441                 GOTO(cleanup, rc);
1442
1443         if (mdd_spobj != mdd_tpobj) {
1444                 la->la_valid = LA_CTIME | LA_MTIME;
1445                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1446                                                   handle, 0);
1447         }
1448
1449         EXIT;
1450 cleanup:
1451         if (likely(tdlh) && sdlh != tdlh)
1452                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1453         if (likely(sdlh))
1454                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1455 cleanup_unlocked:
1456         mdd_trans_stop(env, mdd, rc, handle);
1457         if (mdd_sobj)
1458                 mdd_object_put(env, mdd_sobj);
1459         return rc;
1460 }
1461
1462 struct md_dir_operations mdd_dir_ops = {
1463         .mdo_is_subdir     = mdd_is_subdir,
1464         .mdo_lookup        = mdd_lookup,
1465         .mdo_create        = mdd_create,
1466         .mdo_rename        = mdd_rename,
1467         .mdo_link          = mdd_link,
1468         .mdo_unlink        = mdd_unlink,
1469         .mdo_name_insert   = mdd_name_insert,
1470         .mdo_name_remove   = mdd_name_remove,
1471         .mdo_rename_tgt    = mdd_rename_tgt,
1472         .mdo_create_data   = mdd_create_data
1473 };