Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 static int mdd_lookup(const struct lu_env *env,
93                       struct md_object *pobj, const struct lu_name *lname,
94                       struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102
103 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
104                           struct lu_fid *fid)
105 {
106         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
107 }
108
109 /*
110  * For root fid use special function, whcih does not compare version component
111  * of fid. Vresion component is different for root fids on all MDTs.
112  */
113 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
114 {
115         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
116                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
117 }
118
119 /*
120  * return 1: if lf is the fid of the ancestor of p1;
121  * return 0: if not;
122  *
123  * return -EREMOTE: if remote object is found, in this
124  * case fid of remote object is saved to @pf;
125  *
126  * otherwise: values < 0, errors.
127  */
128 static int mdd_is_parent(const struct lu_env *env,
129                          struct mdd_device *mdd,
130                          struct mdd_object *p1,
131                          const struct lu_fid *lf,
132                          struct lu_fid *pf)
133 {
134         struct mdd_object *parent = NULL;
135         struct lu_fid *pfid;
136         int rc;
137         ENTRY;
138
139         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
140         pfid = &mdd_env_info(env)->mti_fid;
141
142         /* Check for root first. */
143         if (mdd_is_root(mdd, mdo2fid(p1)))
144                 RETURN(0);
145
146         for(;;) {
147                 /* this is done recursively, bypass capa for each obj */
148                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
149                 rc = mdd_parent_fid(env, p1, pfid);
150                 if (rc)
151                         GOTO(out, rc);
152                 if (mdd_is_root(mdd, pfid))
153                         GOTO(out, rc = 0);
154                 if (lu_fid_eq(pfid, lf))
155                         GOTO(out, rc = 1);
156                 if (parent)
157                         mdd_object_put(env, parent);
158                 parent = mdd_object_find(env, mdd, pfid);
159
160                 /* cross-ref parent */
161                 if (parent == NULL) {
162                         if (pf != NULL)
163                                 *pf = *pfid;
164                         GOTO(out, rc = -EREMOTE);
165                 } else if (IS_ERR(parent))
166                         GOTO(out, rc = PTR_ERR(parent));
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 static int mdd_is_subdir(const struct lu_env *env,
188                          struct md_object *mo, const struct lu_fid *fid,
189                          struct lu_fid *sfid)
190 {
191         struct mdd_device *mdd = mdo2mdd(mo);
192         int rc;
193         ENTRY;
194
195         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
196                 RETURN(0);
197
198         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
199         if (rc == 0) {
200                 /* found root */
201                 fid_zero(sfid);
202         } else if (rc == 1) {
203                 /* found @fid is parent */
204                 *sfid = *fid;
205                 rc = 0;
206         }
207         RETURN(rc);
208 }
209
210 /*
211  * Check that @dir contains no entries except (possibly) dot and dotdot.
212  *
213  * Returns:
214  *
215  *             0        empty
216  *      -ENOTDIR        not a directory object
217  *    -ENOTEMPTY        not empty
218  *           -ve        other error
219  *
220  */
221 static int mdd_dir_is_empty(const struct lu_env *env,
222                             struct mdd_object *dir)
223 {
224         struct dt_it     *it;
225         struct dt_object *obj;
226         struct dt_it_ops *iops;
227         int result;
228         ENTRY;
229
230         obj = mdd_object_child(dir);
231         if (!dt_try_as_dir(env, obj))
232                 RETURN(-ENOTDIR);
233
234         iops = &obj->do_index_ops->dio_it;
235         it = iops->init(env, obj, 0, BYPASS_CAPA);
236         if (it != NULL) {
237                 result = iops->get(env, it, (const void *)"");
238                 if (result > 0) {
239                         int i;
240                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
241                                 result = iops->next(env, it);
242                         if (result == 0)
243                                 result = -ENOTEMPTY;
244                         else if (result == +1)
245                                 result = 0;
246                 } else if (result == 0)
247                         /*
248                          * Huh? Index contains no zero key?
249                          */
250                         result = -EIO;
251
252                 iops->put(env, it);
253                 iops->fini(env, it);
254         } else
255                 result = -ENOMEM;
256         RETURN(result);
257 }
258
259 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
260 {
261         struct mdd_device *m = mdd_obj2mdd_dev(obj);
262         struct lu_attr *la = &mdd_env_info(env)->mti_la;
263         int rc;
264         ENTRY;
265
266         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
267         if (rc)
268                 RETURN(rc);
269
270         /*
271          * Subdir count limitation can be broken through.
272          */ 
273         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
274             !S_ISDIR(la->la_mode))
275                 RETURN(-EMLINK);
276         else
277                 RETURN(0);
278 }
279
280 /*
281  * Check whether it may create the cobj under the pobj.
282  * cobj maybe NULL
283  */
284 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
285                    struct mdd_object *cobj, int check_perm, int check_nlink)
286 {
287         int rc = 0;
288         ENTRY;
289
290         if (cobj && mdd_object_exists(cobj))
291                 RETURN(-EEXIST);
292
293         if (mdd_is_dead_obj(pobj))
294                 RETURN(-ENOENT);
295
296         if (check_perm)
297                 rc = mdd_permission_internal_locked(env, pobj, NULL,
298                                                     MAY_WRITE | MAY_EXEC);
299
300         if (!rc && check_nlink)
301                 rc = __mdd_may_link(env, pobj);
302
303         RETURN(rc);
304 }
305
306 /*
307  * Check whether can unlink from the pobj in the case of "cobj == NULL".
308  */
309 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
310                    const struct md_attr *ma)
311 {
312         int rc;
313         ENTRY;
314
315         if (mdd_is_dead_obj(pobj))
316                 RETURN(-ENOENT);
317
318         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
319             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
320                 RETURN(-EPERM);
321
322         rc = mdd_permission_internal_locked(env, pobj, NULL,
323                                             MAY_WRITE | MAY_EXEC);
324         if (rc)
325                 RETURN(rc);
326
327         if (mdd_is_append(pobj))
328                 RETURN(-EPERM);
329
330         RETURN(rc);
331 }
332
333 /*
334  * pobj == NULL is remote ops case, under such case, pobj's
335  * VTX feature has been checked already, no need check again.
336  */
337 static inline int mdd_is_sticky(const struct lu_env *env,
338                                 struct mdd_object *pobj,
339                                 struct mdd_object *cobj)
340 {
341         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
342         struct md_ucred *uc = md_ucred(env);
343         int rc;
344
345         if (pobj) {
346                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
347                 if (rc)
348                         return rc;
349         
350                 if (!(tmp_la->la_mode & S_ISVTX) ||
351                      (tmp_la->la_uid == uc->mu_fsuid))
352                         return 0;
353         }
354
355         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
356         if (rc) 
357                 return rc;
358         
359         if (tmp_la->la_uid == uc->mu_fsuid)
360                 return 0;
361         
362         return !mdd_capable(uc, CAP_FOWNER);
363 }
364
365 /*
366  * Check whether it may delete the cobj from the pobj.
367  * pobj maybe NULL
368  */
369 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
370                    struct mdd_object *cobj, struct md_attr *ma,
371                    int check_perm, int check_empty)
372 {
373         int rc = 0;
374         ENTRY;
375
376         LASSERT(cobj);
377         if (!mdd_object_exists(cobj))
378                 RETURN(-ENOENT);
379
380         if (pobj) {
381                 if (mdd_is_dead_obj(pobj))
382                         RETURN(-ENOENT);
383
384                 if (check_perm) {
385                         rc = mdd_permission_internal_locked(env, pobj, NULL,
386                                                     MAY_WRITE | MAY_EXEC);
387                         if (rc)
388                                 RETURN(rc);
389                 }
390
391                 if (mdd_is_append(pobj))
392                         RETURN(-EPERM);
393         }
394
395         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
396             mdd_is_sticky(env, pobj, cobj))
397                 RETURN(-EPERM);
398
399         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
400                 RETURN(-EPERM);
401
402         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
403             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
404                 RETURN(-EPERM);
405
406         if (S_ISDIR(ma->ma_attr.la_mode)) {
407                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
408
409                 if (!S_ISDIR(mdd_object_type(cobj)))
410                         RETURN(-ENOTDIR);
411
412                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
413                         RETURN(-EBUSY);
414         } else if (S_ISDIR(mdd_object_type(cobj)))
415                 RETURN(-EISDIR);
416
417         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
418                 rc = mdd_dir_is_empty(env, cobj);
419
420         RETURN(rc);
421 }
422
423 /*
424  * tgt maybe NULL
425  * has mdd_write_lock on src already, but not on tgt yet
426  */
427 int mdd_link_sanity_check(const struct lu_env *env,
428                           struct mdd_object *tgt_obj,
429                           const struct lu_name *lname,
430                           struct mdd_object *src_obj)
431 {
432         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
433         int rc = 0;
434         ENTRY;
435
436         /* Local ops, no lookup before link, check filename length here. */
437         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
438                 RETURN(-ENAMETOOLONG);
439
440         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
441                 RETURN(-EPERM);
442
443         if (S_ISDIR(mdd_object_type(src_obj)))
444                 RETURN(-EPERM);
445
446         LASSERT(src_obj != tgt_obj);
447         if (tgt_obj) {
448                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
449                 if (rc)
450                         RETURN(rc);
451         }
452
453         rc = __mdd_may_link(env, src_obj);
454
455         RETURN(rc);
456 }
457
458 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
459                                    const struct lu_fid *fid)
460 {
461         struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
462
463         fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
464         return (const struct dt_rec *)pack;
465 }
466
467 /**
468  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
469  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
470  * (maybe too large to be represented). It is a trick to break through the
471  * "i_nlink" limitation for subdir count.
472  */
473 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
474                    struct thandle *handle)
475 {
476         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
477         struct mdd_device *m = mdd_obj2mdd_dev(obj);
478
479         if (!mdd_is_mnlink(obj)) {
480                 if (S_ISDIR(mdd_object_type(obj))) {
481                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
482                                 return;
483
484                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
485                                 obj->mod_flags |= MNLINK_OBJ;
486                                 tmp_la->la_nlink = 1;
487                                 tmp_la->la_valid = LA_NLINK;
488                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
489                                                       0);
490                                 return;
491                         }
492                 }
493                 mdo_ref_add(env, obj, handle);
494         }
495 }
496
497 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
498                    struct thandle *handle, int is_dot)
499 {
500         if (!mdd_is_mnlink(obj) || is_dot)
501                 mdo_ref_del(env, obj, handle);
502 }
503
504 /* insert named index, add reference if isdir */
505 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
506                               const struct lu_fid *lf, const char *name, int is_dir,
507                               struct thandle *handle, struct lustre_capa *capa)
508 {
509         struct dt_object *next = mdd_object_child(pobj);
510         int               rc;
511         ENTRY;
512
513         if (dt_try_as_dir(env, next)) {
514                 rc = next->do_index_ops->dio_insert(env, next,
515                                                     __mdd_fid_rec(env, lf),
516                                                     (const struct dt_key *)name,
517                                                     handle, capa);
518         } else {
519                 rc = -ENOTDIR;
520         }
521
522         if (rc == 0) {
523                 if (is_dir) {
524                         mdd_write_lock(env, pobj);
525                         __mdd_ref_add(env, pobj, handle);
526                         mdd_write_unlock(env, pobj);
527                 }
528         }
529         RETURN(rc);
530 }
531
532 /* delete named index, drop reference if isdir */
533 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
534                               const char *name, int is_dir, struct thandle *handle,
535                               struct lustre_capa *capa)
536 {
537         struct dt_object *next = mdd_object_child(pobj);
538         int               rc;
539         ENTRY;
540
541         if (dt_try_as_dir(env, next)) {
542                 rc = next->do_index_ops->dio_delete(env, next,
543                                                     (struct dt_key *)name,
544                                                     handle, capa);
545                 if (rc == 0 && is_dir) {
546                         int is_dot = 0;
547
548                         if (name != NULL && name[0] == '.' && name[1] == 0)
549                                 is_dot = 1;
550                         mdd_write_lock(env, pobj);
551                         __mdd_ref_del(env, pobj, handle, is_dot);
552                         mdd_write_unlock(env, pobj);
553                 }
554         } else
555                 rc = -ENOTDIR;
556
557         RETURN(rc);
558 }
559
560 static int
561 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
562                         const struct lu_fid *lf, const char *name,
563                         struct thandle *handle, struct lustre_capa *capa)
564 {
565         struct dt_object *next = mdd_object_child(pobj);
566         int               rc;
567         ENTRY;
568
569         if (dt_try_as_dir(env, next)) {
570                 rc = next->do_index_ops->dio_insert(env, next,
571                                                     __mdd_fid_rec(env, lf),
572                                                     (const struct dt_key *)name,
573                                                     handle, capa);
574         } else {
575                 rc = -ENOTDIR;
576         }
577         RETURN(rc);
578 }
579
580 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
581                     struct md_object *src_obj, const struct lu_name *lname,
582                     struct md_attr *ma)
583 {
584         char *name = lname->ln_name;
585         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
586         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
587         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
588         struct mdd_device *mdd = mdo2mdd(src_obj);
589         struct dynlock_handle *dlh;
590         struct thandle *handle;
591         int rc;
592         ENTRY;
593
594         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
595         handle = mdd_trans_start(env, mdd);
596         if (IS_ERR(handle))
597                 RETURN(PTR_ERR(handle));
598
599         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
600         if (dlh == NULL)
601                 GOTO(out_trans, rc = -ENOMEM);
602         mdd_write_lock(env, mdd_sobj);
603
604         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
605         if (rc)
606                 GOTO(out_unlock, rc);
607
608         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
609                                      name, handle,
610                                      mdd_object_capa(env, mdd_tobj));
611         if (rc)
612                 GOTO(out_unlock, rc);
613
614         __mdd_ref_add(env, mdd_sobj, handle);
615
616         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
617         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
618
619         la->la_valid = LA_CTIME | LA_MTIME;
620         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
621         if (rc)
622                 GOTO(out_unlock, rc);
623
624         la->la_valid = LA_CTIME;
625         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
626         EXIT;
627 out_unlock:
628         mdd_write_unlock(env, mdd_sobj);
629         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
630 out_trans:
631         mdd_trans_stop(env, mdd, rc, handle);
632         return rc;
633 }
634
635 /* caller should take a lock before calling */
636 int mdd_finish_unlink(const struct lu_env *env,
637                       struct mdd_object *obj, struct md_attr *ma,
638                       struct thandle *th)
639 {
640         int rc;
641         ENTRY;
642
643         rc = mdd_iattr_get(env, obj, ma);
644         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
645                 /* add new orphan and the object
646                  * will be deleted during the object_put() */
647                 if (__mdd_orphan_add(env, obj, th) == 0)
648                         obj->mod_flags |= ORPHAN_OBJ;
649
650                 obj->mod_flags |= DEAD_OBJ;
651                 if (obj->mod_count == 0)
652                         rc = mdd_object_kill(env, obj, ma);
653                 else
654                         /* clear MA_LOV | MA_COOKIE, if we do not
655                          * unlink it in case we get it somewhere */
656                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
657         } else
658                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
659
660         RETURN(rc);
661 }
662
663 /*
664  * pobj maybe NULL
665  * has mdd_write_lock on cobj already, but not on pobj yet
666  */
667 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
668                             struct mdd_object *cobj, struct md_attr *ma)
669 {
670         int rc;
671         ENTRY;
672
673         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
674
675         RETURN(rc);
676 }
677
678 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
679                       struct md_object *cobj, const struct lu_name *lname,
680                       struct md_attr *ma)
681 {
682         char *name = lname->ln_name;
683         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
684         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
685         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
686         struct mdd_device *mdd = mdo2mdd(pobj);
687         struct dynlock_handle *dlh;
688         struct thandle    *handle;
689         int rc, is_dir;
690         ENTRY;
691
692         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
693                  PFID(mdd_object_fid(mdd_cobj)));
694
695         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
696         if (rc)
697                 RETURN(rc);
698
699         handle = mdd_trans_start(env, mdd);
700         if (IS_ERR(handle))
701                 RETURN(PTR_ERR(handle));
702
703
704         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
705         if (dlh == NULL)
706                 GOTO(out_trans, rc = -ENOMEM);
707         mdd_write_lock(env, mdd_cobj);
708
709         is_dir = S_ISDIR(ma->ma_attr.la_mode);
710         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
711         if (rc)
712                 GOTO(cleanup, rc);
713
714         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
715                                 mdd_object_capa(env, mdd_pobj));
716         if (rc)
717                 GOTO(cleanup, rc);
718
719         __mdd_ref_del(env, mdd_cobj, handle, 0);
720         if (is_dir)
721                 /* unlink dot */
722                 __mdd_ref_del(env, mdd_cobj, handle, 1);
723
724         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
725         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
726
727         la->la_valid = LA_CTIME | LA_MTIME;
728         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
729         if (rc)
730                 GOTO(cleanup, rc);
731
732         la->la_valid = LA_CTIME;
733         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
734         if (rc)
735                 GOTO(cleanup, rc);
736
737         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
738
739         if (rc == 0)
740                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
741                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
742                                    NULL, NULL);
743         EXIT;
744 cleanup:
745         mdd_write_unlock(env, mdd_cobj);
746         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
747 out_trans:
748         mdd_trans_stop(env, mdd, rc, handle);
749         return rc;
750 }
751
752 /* has not lock on pobj yet */
753 static int mdd_ni_sanity_check(const struct lu_env *env,
754                                struct md_object *pobj,
755                                const struct md_attr *ma)
756 {
757         struct mdd_object *obj = md2mdd_obj(pobj);
758         int rc;
759         ENTRY;
760
761         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
762                 RETURN(0);
763
764         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
765
766         RETURN(rc);
767 }
768
769 /*
770  * Partial operation.
771  */
772 static int mdd_name_insert(const struct lu_env *env,
773                            struct md_object *pobj,
774                            const struct lu_name *lname,
775                            const struct lu_fid *fid,
776                            const struct md_attr *ma)
777 {
778         char *name = lname->ln_name;
779         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
780         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
781         struct mdd_device *mdd = mdo2mdd(pobj);
782         struct dynlock_handle *dlh;
783         struct thandle *handle;
784         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
785         int rc;
786         ENTRY;
787
788         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
789         handle = mdd_trans_start(env, mdo2mdd(pobj));
790         if (IS_ERR(handle))
791                 RETURN(PTR_ERR(handle));
792
793         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
794         if (dlh == NULL)
795                 GOTO(out_trans, rc = -ENOMEM);
796
797         rc = mdd_ni_sanity_check(env, pobj, ma);
798         if (rc)
799                 GOTO(out_unlock, rc);
800
801         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
802                                 handle, BYPASS_CAPA);
803         if (rc)
804                 GOTO(out_unlock, rc);
805
806         /*
807          * For some case, no need update obj's ctime (LA_CTIME is not set),
808          * e.g. split_dir.
809          * For other cases, update obj's ctime (LA_CTIME is set),
810          * e.g. cmr_link.
811          */
812         if (ma->ma_attr.la_valid & LA_CTIME) {
813                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
814                 la->la_valid = LA_CTIME | LA_MTIME;
815                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
816                                                         handle, 0);
817         }
818         EXIT;
819 out_unlock:
820         mdd_pdo_write_unlock(env, mdd_obj, dlh);
821 out_trans:
822         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
823         return rc;
824 }
825
826 /* has not lock on pobj yet */
827 static int mdd_nr_sanity_check(const struct lu_env *env,
828                                struct md_object *pobj,
829                                const struct md_attr *ma)
830 {
831         struct mdd_object *obj = md2mdd_obj(pobj);
832         int rc;
833         ENTRY;
834
835         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
836                 RETURN(0);
837
838         rc = mdd_may_unlink(env, obj, ma);
839
840         RETURN(rc);
841 }
842
843 /*
844  * Partial operation.
845  */
846 static int mdd_name_remove(const struct lu_env *env,
847                            struct md_object *pobj,
848                            const struct lu_name *lname,
849                            const struct md_attr *ma)
850 {
851         char *name = lname->ln_name;
852         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
853         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
854         struct mdd_device *mdd = mdo2mdd(pobj);
855         struct dynlock_handle *dlh;
856         struct thandle *handle;
857         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
858         int rc;
859         ENTRY;
860
861         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
862         handle = mdd_trans_start(env, mdd);
863         if (IS_ERR(handle))
864                 RETURN(PTR_ERR(handle));
865
866         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
867         if (dlh == NULL)
868                 GOTO(out_trans, rc = -ENOMEM);
869
870         rc = mdd_nr_sanity_check(env, pobj, ma);
871         if (rc)
872                 GOTO(out_unlock, rc);
873
874         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
875                                 handle, BYPASS_CAPA);
876         if (rc)
877                 GOTO(out_unlock, rc);
878
879         /*
880          * For some case, no need update obj's ctime (LA_CTIME is not set),
881          * e.g. split_dir.
882          * For other cases, update obj's ctime (LA_CTIME is set),
883          * e.g. cmr_unlink.
884          */
885         if (ma->ma_attr.la_valid & LA_CTIME) {
886                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
887                 la->la_valid = LA_CTIME | LA_MTIME;
888                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
889                                                         handle, 0);
890         }
891         EXIT;
892 out_unlock:
893         mdd_pdo_write_unlock(env, mdd_obj, dlh);
894 out_trans:
895         mdd_trans_stop(env, mdd, rc, handle);
896         return rc;
897 }
898
899 /*
900  * tobj maybe NULL
901  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
902  */
903 static int mdd_rt_sanity_check(const struct lu_env *env,
904                                struct mdd_object *tgt_pobj,
905                                struct mdd_object *tobj,
906                                struct md_attr *ma)
907 {
908         int rc;
909         ENTRY;
910
911         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
912                 RETURN(0);
913
914         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
915          * exist. In fact, tobj must exist, otherwise the call trace will be:
916          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
917          * When get here, tobj must be NOT NULL, the other case has been
918          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
919          * MDS_PERM_BYPASS.
920          * So check may_delete, but not check nlink of tgt_pobj. */
921         LASSERT(tobj);
922         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
923
924         RETURN(rc);
925 }
926
927 static int mdd_rename_tgt(const struct lu_env *env,
928                           struct md_object *pobj, struct md_object *tobj,
929                           const struct lu_fid *lf, const struct lu_name *lname,
930                           struct md_attr *ma)
931 {
932         char *name = lname->ln_name;
933         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
934         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
935         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
936         struct mdd_device *mdd = mdo2mdd(pobj);
937         struct dynlock_handle *dlh;
938         struct thandle *handle;
939         int rc;
940         ENTRY;
941
942         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
943         handle = mdd_trans_start(env, mdd);
944         if (IS_ERR(handle))
945                 RETURN(PTR_ERR(handle));
946
947         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
948         if (dlh == NULL)
949                 GOTO(out_trans, rc = -ENOMEM);
950         if (tobj)
951                 mdd_write_lock(env, mdd_tobj);
952
953         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
954         if (rc)
955                 GOTO(cleanup, rc);
956
957         /*
958          * If rename_tgt is called then we should just re-insert name with
959          * correct fid, no need to dec/inc parent nlink if obj is dir.
960          */
961         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
962         if (rc)
963                 GOTO(cleanup, rc);
964
965         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
966                                      BYPASS_CAPA);
967         if (rc)
968                 GOTO(cleanup, rc);
969
970         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
971         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
972
973         la->la_valid = LA_CTIME | LA_MTIME;
974         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
975         if (rc)
976                 GOTO(cleanup, rc);
977
978         /* 
979          * For tobj is remote case cmm layer has processed
980          * and pass NULL tobj to here. So when tobj is NOT NULL,
981          * it must be local one.
982          */
983         if (tobj && mdd_object_exists(mdd_tobj)) {
984                 __mdd_ref_del(env, mdd_tobj, handle, 0);
985
986                 /* Remove dot reference. */
987                 if (S_ISDIR(ma->ma_attr.la_mode))
988                         __mdd_ref_del(env, mdd_tobj, handle, 1);
989
990                 la->la_valid = LA_CTIME;
991                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
992                 if (rc)
993                         GOTO(cleanup, rc);
994
995                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
996                 if (rc)
997                         GOTO(cleanup, rc);
998         }
999         EXIT;
1000 cleanup:
1001         if (tobj)
1002                 mdd_write_unlock(env, mdd_tobj);
1003         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1004 out_trans:
1005         mdd_trans_stop(env, mdd, rc, handle);
1006         return rc;
1007 }
1008
1009 /*
1010  * The permission has been checked when obj created, no need check again.
1011  */
1012 static int mdd_cd_sanity_check(const struct lu_env *env,
1013                                struct mdd_object *obj)
1014 {
1015         ENTRY;
1016
1017         /* EEXIST check */
1018         if (!obj || mdd_is_dead_obj(obj))
1019                 RETURN(-ENOENT);
1020
1021         RETURN(0);
1022
1023 }
1024
1025 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1026                            struct md_object *cobj, const struct md_op_spec *spec,
1027                            struct md_attr *ma)
1028 {
1029         struct mdd_device *mdd = mdo2mdd(cobj);
1030         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1031         struct mdd_object *son = md2mdd_obj(cobj);
1032         struct lu_attr    *attr = &ma->ma_attr;
1033         struct lov_mds_md *lmm = NULL;
1034         int                lmm_size = 0;
1035         struct thandle    *handle;
1036         int                rc;
1037         ENTRY;
1038
1039         rc = mdd_cd_sanity_check(env, son);
1040         if (rc)
1041                 RETURN(rc);
1042
1043         if (!md_should_create(spec->sp_cr_flags))
1044                 RETURN(0);
1045
1046         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1047                             spec, attr);
1048         if (rc)
1049                 RETURN(rc);
1050
1051         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1052         handle = mdd_trans_start(env, mdd);
1053         if (IS_ERR(handle))
1054                 GOTO(out_free, rc = PTR_ERR(handle));
1055
1056         /*
1057          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1058          * Should this be fixed?
1059          */
1060
1061         /* Replay creates has objects already */
1062 #if 0
1063         if (spec->u.sp_ea.no_lov_create) {
1064                 CDEBUG(D_INFO, "we already have lov ea\n");
1065                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1066                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1067                                     spec->u.sp_ea.eadatalen, handle, 0);
1068         } else
1069 #endif
1070                 /* No need mdd_lsm_sanity_check here */
1071                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1072                                     lmm_size, handle, 0);
1073
1074         if (rc == 0)
1075                rc = mdd_attr_get_internal_locked(env, son, ma);
1076
1077         /* update lov_objid data, must be before transaction stop! */
1078         if (rc == 0)
1079                 mdd_lov_objid_update(mdd, lmm);
1080
1081         mdd_trans_stop(env, mdd, rc, handle);
1082 out_free:
1083         /* Finish mdd_lov_create() stuff. */
1084         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1085         RETURN(rc);
1086 }
1087
1088 static int
1089 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1090              const struct lu_name *lname, struct lu_fid* fid, int mask)
1091 {
1092         char                *name = lname->ln_name;
1093         const struct dt_key *key = (const struct dt_key *)name;
1094         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1095         struct mdd_device   *m = mdo2mdd(pobj);
1096         struct dt_object    *dir = mdd_object_child(mdd_obj);
1097         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1098         int rc;
1099         ENTRY;
1100
1101         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1102                 RETURN(-ESTALE);
1103
1104         rc = mdd_object_exists(mdd_obj);
1105         if (unlikely(rc == 0))
1106                 RETURN(-ESTALE);
1107         else if (unlikely(rc < 0)) {
1108                 CERROR("Object "DFID" locates on remote server\n",
1109                         PFID(mdo2fid(mdd_obj)));
1110                 LBUG();
1111         }
1112
1113         /* The common filename length check. */
1114         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1115                 RETURN(-ENAMETOOLONG);
1116
1117         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
1118         if (rc)
1119                 RETURN(rc);
1120
1121         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1122                    dt_try_as_dir(env, dir))) {
1123                 rc = dir->do_index_ops->dio_lookup(env, dir,
1124                                                  (struct dt_rec *)pack, key,
1125                                                  mdd_object_capa(env, mdd_obj));
1126                 if (rc == 0)
1127                         rc = fid_unpack(pack, fid);
1128         } else
1129                 rc = -ENOTDIR;
1130
1131         RETURN(rc);
1132 }
1133
1134 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1135                           struct mdd_object *child, struct md_attr *ma,
1136                           struct thandle *handle)
1137 {
1138         int rc;
1139         ENTRY;
1140
1141         /*
1142          * Update attributes for child.
1143          *
1144          * FIXME:
1145          *  (1) the valid bits should be converted between Lustre and Linux;
1146          *  (2) maybe, the child attributes should be set in OSD when creation.
1147          */
1148
1149         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1150         if (rc != 0)
1151                 RETURN(rc);
1152
1153         if (S_ISDIR(ma->ma_attr.la_mode)) {
1154                 /* Add "." and ".." for newly created dir */
1155                 __mdd_ref_add(env, child, handle);
1156                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1157                                              dot, handle, BYPASS_CAPA);
1158                 if (rc == 0) {
1159                         rc = __mdd_index_insert_only(env, child, pfid,
1160                                                      dotdot, handle,
1161                                                      BYPASS_CAPA);
1162                         if (rc != 0) {
1163                                 int rc2;
1164
1165                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1166                                                          handle, BYPASS_CAPA);
1167                                 if (rc2 != 0)
1168                                         CERROR("Failure to cleanup after dotdot"
1169                                                " creation: %d (%d)\n", rc2, rc);
1170                         }
1171                 }
1172         }
1173         RETURN(rc);
1174 }
1175
1176 /* has not lock on pobj yet */
1177 static int mdd_create_sanity_check(const struct lu_env *env,
1178                                    struct md_object *pobj,
1179                                    const struct lu_name *lname,
1180                                    struct md_attr *ma,
1181                                    struct md_op_spec *spec)
1182 {
1183         struct mdd_thread_info *info = mdd_env_info(env);
1184         struct lu_attr    *la        = &info->mti_la;
1185         struct lu_fid     *fid       = &info->mti_fid;
1186         struct mdd_object *obj       = md2mdd_obj(pobj);
1187         struct mdd_device *m         = mdo2mdd(pobj);
1188         int lookup                   = spec->sp_cr_lookup;
1189         int rc;
1190         ENTRY;
1191
1192         /* EEXIST check */
1193         if (mdd_is_dead_obj(obj))
1194                 RETURN(-ENOENT);
1195
1196         /*
1197          * In some cases this lookup is not needed - we know before if name
1198          * exists or not because MDT performs lookup for it.
1199          * name length check is done in lookup.
1200          */
1201         if (lookup) {
1202                 /*
1203                  * Check if the name already exist, though it will be checked in
1204                  * _index_insert also, for avoiding rolling back if exists
1205                  * _index_insert.
1206                  */
1207                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1208                                          MAY_WRITE | MAY_EXEC);
1209                 if (rc != -ENOENT)
1210                         RETURN(rc ? : -EEXIST);
1211         } else {
1212                 /*
1213                  * Check WRITE permission for the parent.
1214                  * EXEC permission have been checked
1215                  * when lookup before create already.
1216                  */
1217                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1218                 if (rc)
1219                         RETURN(rc);
1220         }
1221
1222         /* sgid check */
1223         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1224         if (rc != 0)
1225                 RETURN(rc);
1226
1227         if (la->la_mode & S_ISGID) {
1228                 ma->ma_attr.la_gid = la->la_gid;
1229                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1230                         ma->ma_attr.la_mode |= S_ISGID;
1231                         ma->ma_attr.la_valid |= LA_MODE;
1232                 }
1233         }
1234
1235         switch (ma->ma_attr.la_mode & S_IFMT) {
1236         case S_IFLNK: {
1237                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1238
1239                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1240                         RETURN(-ENAMETOOLONG);
1241                 else
1242                         RETURN(0);
1243         }
1244         case S_IFDIR:
1245         case S_IFREG:
1246         case S_IFCHR:
1247         case S_IFBLK:
1248         case S_IFIFO:
1249         case S_IFSOCK:
1250                 rc = 0;
1251                 break;
1252         default:
1253                 rc = -EINVAL;
1254                 break;
1255         }
1256         RETURN(rc);
1257 }
1258
1259 /*
1260  * Create object and insert it into namespace.
1261  */
1262 static int mdd_create(const struct lu_env *env,
1263                       struct md_object *pobj,
1264                       const struct lu_name *lname,
1265                       struct md_object *child,
1266                       struct md_op_spec *spec,
1267                       struct md_attr* ma)
1268 {
1269         struct mdd_thread_info *info = mdd_env_info(env);
1270         struct lu_attr         *la = &info->mti_la_for_fix;
1271         struct md_attr         *ma_acl = &info->mti_ma;
1272         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1273         struct mdd_object      *son = md2mdd_obj(child);
1274         struct mdd_device      *mdd = mdo2mdd(pobj);
1275         struct lu_attr         *attr = &ma->ma_attr;
1276         struct lov_mds_md      *lmm = NULL;
1277         struct thandle         *handle;
1278         struct dynlock_handle  *dlh;
1279         char                   *name = lname->ln_name;
1280         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1281         int got_def_acl = 0;
1282         ENTRY;
1283
1284         /*
1285          * Two operations have to be performed:
1286          *
1287          *  - an allocation of a new object (->do_create()), and
1288          *
1289          *  - an insertion into a parent index (->dio_insert()).
1290          *
1291          * Due to locking, operation order is not important, when both are
1292          * successful, *but* error handling cases are quite different:
1293          *
1294          *  - if insertion is done first, and following object creation fails,
1295          *  insertion has to be rolled back, but this operation might fail
1296          *  also leaving us with dangling index entry.
1297          *
1298          *  - if creation is done first, is has to be undone if insertion
1299          *  fails, leaving us with leaked space, which is neither good, nor
1300          *  fatal.
1301          *
1302          * It seems that creation-first is simplest solution, but it is
1303          * sub-optimal in the frequent
1304          *
1305          *         $ mkdir foo
1306          *         $ mkdir foo
1307          *
1308          * case, because second mkdir is bound to create object, only to
1309          * destroy it immediately.
1310          *
1311          * To avoid this follow local file systems that do double lookup:
1312          *
1313          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1314          *
1315          *     1. create            (mdd_object_create_internal())
1316          *
1317          *     2. insert            (__mdd_index_insert(), lookup again)
1318          */
1319
1320         /* Sanity checks before big job. */
1321         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1322         if (rc)
1323                 RETURN(rc);
1324
1325         /*
1326          * No RPC inside the transaction, so OST objects should be created at
1327          * first.
1328          */
1329         if (S_ISREG(attr->la_mode)) {
1330                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1331                                     spec, attr);
1332                 if (rc)
1333                         RETURN(rc);
1334         }
1335
1336         if (!S_ISLNK(attr->la_mode)) {
1337                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1338                 ma_acl->ma_acl = info->mti_xattr_buf;
1339                 ma_acl->ma_need = MA_ACL_DEF;
1340                 ma_acl->ma_valid = 0;
1341
1342                 mdd_read_lock(env, mdd_pobj);
1343                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1344                 mdd_read_unlock(env, mdd_pobj);
1345                 if (rc)
1346                         GOTO(out_free, rc);
1347                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1348                         got_def_acl = 1;
1349         }
1350
1351         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1352         handle = mdd_trans_start(env, mdd);
1353         if (IS_ERR(handle))
1354                 GOTO(out_free, rc = PTR_ERR(handle));
1355
1356         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1357         if (dlh == NULL)
1358                 GOTO(out_trans, rc = -ENOMEM);
1359
1360         mdd_write_lock(env, son);
1361         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
1362         if (rc) {
1363                 mdd_write_unlock(env, son);
1364                 GOTO(cleanup, rc);
1365         }
1366
1367         created = 1;
1368
1369 #ifdef CONFIG_FS_POSIX_ACL
1370         if (got_def_acl) {
1371                 struct lu_buf *acl_buf = &info->mti_buf;
1372                 acl_buf->lb_buf = ma_acl->ma_acl;
1373                 acl_buf->lb_len = ma_acl->ma_acl_size;
1374
1375                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1376                 if (rc) {
1377                         mdd_write_unlock(env, son);
1378                         GOTO(cleanup, rc);
1379                 } else {
1380                         ma->ma_attr.la_valid |= LA_MODE;
1381                 }
1382         }
1383 #endif
1384
1385         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1386                                    son, ma, handle);
1387         mdd_write_unlock(env, son);
1388         if (rc)
1389                 /*
1390                  * Object has no links, so it will be destroyed when last
1391                  * reference is released. (XXX not now.)
1392                  */
1393                 GOTO(cleanup, rc);
1394
1395         initialized = 1;
1396
1397         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1398                                 name, S_ISDIR(attr->la_mode), handle,
1399                                 mdd_object_capa(env, mdd_pobj));
1400
1401         if (rc)
1402                 GOTO(cleanup, rc);
1403
1404         inserted = 1;
1405
1406         /* No need mdd_lsm_sanity_check here */
1407         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1408         if (rc) {
1409                 CERROR("error on stripe info copy %d \n", rc);
1410                 GOTO(cleanup, rc);
1411         }
1412         if (lmm && lmm_size > 0) {
1413                 /* Set Lov here, do not get lmm again later */
1414                 memcpy(ma->ma_lmm, lmm, lmm_size);
1415                 ma->ma_lmm_size = lmm_size;
1416                 ma->ma_valid |= MA_LOV;
1417         }
1418
1419         if (S_ISLNK(attr->la_mode)) {
1420                 struct dt_object *dt = mdd_object_child(son);
1421                 const char *target_name = spec->u.sp_symname;
1422                 int sym_len = strlen(target_name);
1423                 const struct lu_buf *buf;
1424                 loff_t pos = 0;
1425
1426                 buf = mdd_buf_get_const(env, target_name, sym_len);
1427                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1428                                                 mdd_object_capa(env, son));
1429
1430                 if (rc == sym_len)
1431                         rc = 0;
1432                 else
1433                         GOTO(cleanup, rc = -EFAULT);
1434         }
1435
1436         *la = ma->ma_attr;
1437         la->la_valid = LA_CTIME | LA_MTIME;
1438         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1439         if (rc)
1440                 GOTO(cleanup, rc);
1441
1442         /* Return attr back. */
1443         rc = mdd_attr_get_internal_locked(env, son, ma);
1444         EXIT;
1445 cleanup:
1446         if (rc && created) {
1447                 int rc2 = 0;
1448
1449                 if (inserted) {
1450                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1451                                                  S_ISDIR(attr->la_mode),
1452                                                  handle, BYPASS_CAPA);
1453                         if (rc2)
1454                                 CERROR("error can not cleanup destroy %d\n",
1455                                        rc2);
1456                 }
1457
1458                 if (rc2 == 0) {
1459                         mdd_write_lock(env, son);
1460                         __mdd_ref_del(env, son, handle, 0);
1461                         if (initialized && S_ISDIR(attr->la_mode))
1462                                 __mdd_ref_del(env, son, handle, 1);
1463                         mdd_write_unlock(env, son);
1464                 }
1465         }
1466
1467         /* update lov_objid data, must be before transaction stop! */
1468         if (rc == 0)
1469                 mdd_lov_objid_update(mdd, lmm);
1470
1471         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1472 out_trans:
1473         mdd_trans_stop(env, mdd, rc, handle);
1474 out_free:
1475         /* finis lov_create stuff, free all temporary data */
1476         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1477         return rc;
1478 }
1479
1480 /*
1481  * Get locks on parents in proper order
1482  * RETURN: < 0 - error, rename_order if successful
1483  */
1484 enum rename_order {
1485         MDD_RN_SAME,
1486         MDD_RN_SRCTGT,
1487         MDD_RN_TGTSRC
1488 };
1489
1490 static int mdd_rename_order(const struct lu_env *env,
1491                             struct mdd_device *mdd,
1492                             struct mdd_object *src_pobj,
1493                             struct mdd_object *tgt_pobj)
1494 {
1495         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1496         int rc;
1497         ENTRY;
1498
1499         if (src_pobj == tgt_pobj)
1500                 RETURN(MDD_RN_SAME);
1501
1502         /* compared the parent child relationship of src_p&tgt_p */
1503         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1504                 rc = MDD_RN_SRCTGT;
1505         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1506                 rc = MDD_RN_TGTSRC;
1507         } else {
1508                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1509                 if (rc == -EREMOTE)
1510                         rc = 0;
1511
1512                 if (rc == 1)
1513                         rc = MDD_RN_TGTSRC;
1514                 else if (rc == 0)
1515                         rc = MDD_RN_SRCTGT;
1516         }
1517
1518         RETURN(rc);
1519 }
1520
1521 /* has not mdd_write{read}_lock on any obj yet. */
1522 static int mdd_rename_sanity_check(const struct lu_env *env,
1523                                    struct mdd_object *src_pobj,
1524                                    struct mdd_object *tgt_pobj,
1525                                    struct mdd_object *sobj,
1526                                    struct mdd_object *tobj,
1527                                    struct md_attr *ma)
1528 {
1529         int rc = 0;
1530         ENTRY;
1531
1532         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1533                 RETURN(0);
1534
1535         /* XXX: when get here, sobj must NOT be NULL,
1536          * the other case has been processed in cml_rename
1537          * before mdd_rename and enable MDS_PERM_BYPASS. */
1538         LASSERT(sobj);
1539         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1540         if (rc)
1541                 RETURN(rc);
1542
1543         /* XXX: when get here, "tobj == NULL" means tobj must
1544          * NOT exist (neither on remote MDS, such case has been
1545          * processed in cml_rename before mdd_rename and enable
1546          * MDS_PERM_BYPASS).
1547          * So check may_create, but not check may_unlink. */
1548         if (!tobj)
1549                 rc = mdd_may_create(env, tgt_pobj, NULL,
1550                                     (src_pobj != tgt_pobj), 0);
1551         else
1552                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1553                                     (src_pobj != tgt_pobj), 1);
1554
1555         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1556             S_ISDIR(ma->ma_attr.la_mode))
1557                 rc = __mdd_may_link(env, tgt_pobj);
1558
1559         RETURN(rc);
1560 }
1561
1562 /* src object can be remote that is why we use only fid and type of object */
1563 static int mdd_rename(const struct lu_env *env,
1564                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1565                       const struct lu_fid *lf, const struct lu_name *lsname,
1566                       struct md_object *tobj, const struct lu_name *ltname,
1567                       struct md_attr *ma)
1568 {
1569         char *sname = lsname->ln_name;
1570         char *tname = ltname->ln_name;
1571         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1572         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1573         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1574         struct mdd_device *mdd = mdo2mdd(src_pobj);
1575         struct mdd_object *mdd_sobj = NULL;
1576         struct mdd_object *mdd_tobj = NULL;
1577         struct dynlock_handle *sdlh, *tdlh;
1578         struct thandle *handle;
1579         int is_dir;
1580         int rc;
1581         ENTRY;
1582
1583         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1584         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1585
1586         if (tobj)
1587                 mdd_tobj = md2mdd_obj(tobj);
1588
1589         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1590         handle = mdd_trans_start(env, mdd);
1591         if (IS_ERR(handle))
1592                 RETURN(PTR_ERR(handle));
1593
1594         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1595         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1596         if (rc < 0)
1597                 GOTO(cleanup_unlocked, rc);
1598
1599         /* Get locks in determined order */
1600         if (rc == MDD_RN_SAME) {
1601                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1602                 /* check hashes to determine do we need one lock or two */
1603                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1604                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1605                 else
1606                         tdlh = sdlh;
1607         } else if (rc == MDD_RN_SRCTGT) {
1608                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1609                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1610         } else {
1611                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1612                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1613         }
1614         if (sdlh == NULL || tdlh == NULL)
1615                 GOTO(cleanup, rc = -ENOMEM);
1616
1617         mdd_sobj = mdd_object_find(env, mdd, lf);
1618         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1619                                      mdd_sobj, mdd_tobj, ma);
1620         if (rc)
1621                 GOTO(cleanup, rc);
1622
1623         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1624                                 mdd_object_capa(env, mdd_spobj));
1625         if (rc)
1626                 GOTO(cleanup, rc);
1627
1628         /*
1629          * Here tobj can be remote one, so we do index_delete unconditionally
1630          * and -ENOENT is allowed.
1631          */
1632         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1633                                 mdd_object_capa(env, mdd_tpobj));
1634         if (rc != 0 && rc != -ENOENT)
1635                 GOTO(cleanup, rc);
1636
1637         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1638                                 mdd_object_capa(env, mdd_tpobj));
1639         if (rc)
1640                 GOTO(cleanup, rc);
1641
1642         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1643         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1644
1645         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1646         if (mdd_sobj) {
1647                 la->la_valid = LA_CTIME;
1648                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1649                                                         handle, 0);
1650                 if (rc)
1651                         GOTO(cleanup, rc);
1652         }
1653
1654         /* 
1655          * For tobj is remote case cmm layer has processed
1656          * and set tobj to NULL then. So when tobj is NOT NULL,
1657          * it must be local one.
1658          */
1659         if (tobj && mdd_object_exists(mdd_tobj)) {
1660                 mdd_write_lock(env, mdd_tobj);
1661                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1662
1663                 /* Remove dot reference. */
1664                 if (is_dir)
1665                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1666
1667                 la->la_valid = LA_CTIME;
1668                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1669                 if (rc)
1670                         GOTO(cleanup, rc);
1671
1672                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1673                 mdd_write_unlock(env, mdd_tobj);
1674                 if (rc)
1675                         GOTO(cleanup, rc);
1676         }
1677
1678         la->la_valid = LA_CTIME | LA_MTIME;
1679         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
1680         if (rc)
1681                 GOTO(cleanup, rc);
1682
1683         if (mdd_spobj != mdd_tpobj) {
1684                 la->la_valid = LA_CTIME | LA_MTIME;
1685                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
1686                                                   handle, 0);
1687         }
1688
1689         EXIT;
1690 cleanup:
1691         if (likely(tdlh) && sdlh != tdlh)
1692                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1693         if (likely(sdlh))
1694                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1695 cleanup_unlocked:
1696         mdd_trans_stop(env, mdd, rc, handle);
1697         if (mdd_sobj)
1698                 mdd_object_put(env, mdd_sobj);
1699         return rc;
1700 }
1701
1702 struct md_dir_operations mdd_dir_ops = {
1703         .mdo_is_subdir     = mdd_is_subdir,
1704         .mdo_lookup        = mdd_lookup,
1705         .mdo_create        = mdd_create,
1706         .mdo_rename        = mdd_rename,
1707         .mdo_link          = mdd_link,
1708         .mdo_unlink        = mdd_unlink,
1709         .mdo_name_insert   = mdd_name_insert,
1710         .mdo_name_remove   = mdd_name_remove,
1711         .mdo_rename_tgt    = mdd_rename_tgt,
1712         .mdo_create_data   = mdd_create_data
1713 };