Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         (char *) dotdot,
53         sizeof(dotdot) - 1
54 };
55
56 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
57                         const struct lu_name *lname, struct lu_fid* fid,
58                         int mask);
59 static int
60 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
61                     const struct lu_name *lname, struct lu_fid* fid, int mask)
62 {
63         char *name = lname->ln_name;
64         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
65         struct dynlock_handle *dlh;
66         int rc;
67
68         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
69         if (unlikely(dlh == NULL))
70                 return -ENOMEM;
71         rc = __mdd_lookup(env, pobj, lname, fid, mask);
72         mdd_pdo_read_unlock(env, mdd_obj, dlh);
73
74         return rc;
75 }
76
77 static int mdd_lookup(const struct lu_env *env,
78                       struct md_object *pobj, const struct lu_name *lname,
79                       struct lu_fid* fid, struct md_op_spec *spec)
80 {
81         int rc;
82         ENTRY;
83         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
84         RETURN(rc);
85 }
86
87
88 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
89                           struct lu_fid *fid)
90 {
91         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
92 }
93
94 /*
95  * For root fid use special function, whcih does not compare version component
96  * of fid. Vresion component is different for root fids on all MDTs.
97  */
98 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
99 {
100         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
101                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
102 }
103
104 /*
105  * return 1: if lf is the fid of the ancestor of p1;
106  * return 0: if not;
107  *
108  * return -EREMOTE: if remote object is found, in this
109  * case fid of remote object is saved to @pf;
110  *
111  * otherwise: values < 0, errors.
112  */
113 static int mdd_is_parent(const struct lu_env *env,
114                          struct mdd_device *mdd,
115                          struct mdd_object *p1,
116                          const struct lu_fid *lf,
117                          struct lu_fid *pf)
118 {
119         struct mdd_object *parent = NULL;
120         struct lu_fid *pfid;
121         int rc;
122         ENTRY;
123
124         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
125         pfid = &mdd_env_info(env)->mti_fid;
126
127         /* Check for root first. */
128         if (mdd_is_root(mdd, mdo2fid(p1)))
129                 RETURN(0);
130
131         for(;;) {
132                 /* this is done recursively, bypass capa for each obj */
133                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
134                 rc = mdd_parent_fid(env, p1, pfid);
135                 if (rc)
136                         GOTO(out, rc);
137                 if (mdd_is_root(mdd, pfid))
138                         GOTO(out, rc = 0);
139                 if (lu_fid_eq(pfid, lf))
140                         GOTO(out, rc = 1);
141                 if (parent)
142                         mdd_object_put(env, parent);
143                 parent = mdd_object_find(env, mdd, pfid);
144
145                 /* cross-ref parent */
146                 if (parent == NULL) {
147                         if (pf != NULL)
148                                 *pf = *pfid;
149                         GOTO(out, rc = -EREMOTE);
150                 } else if (IS_ERR(parent))
151                         GOTO(out, rc = PTR_ERR(parent));
152                 p1 = parent;
153         }
154         EXIT;
155 out:
156         if (parent && !IS_ERR(parent))
157                 mdd_object_put(env, parent);
158         return rc;
159 }
160
161 /*
162  * No permission check is needed.
163  *
164  * returns 1: if fid is ancestor of @mo;
165  * returns 0: if fid is not a ancestor of @mo;
166  *
167  * returns EREMOTE if remote object is found, fid of remote object is saved to
168  * @fid;
169  *
170  * returns < 0: if error
171  */
172 static int mdd_is_subdir(const struct lu_env *env,
173                          struct md_object *mo, const struct lu_fid *fid,
174                          struct lu_fid *sfid)
175 {
176         struct mdd_device *mdd = mdo2mdd(mo);
177         int rc;
178         ENTRY;
179
180         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
181                 RETURN(0);
182
183         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
184         if (rc == 0) {
185                 /* found root */
186                 fid_zero(sfid);
187         } else if (rc == 1) {
188                 /* found @fid is parent */
189                 *sfid = *fid;
190                 rc = 0;
191         }
192         RETURN(rc);
193 }
194
195 /*
196  * Check that @dir contains no entries except (possibly) dot and dotdot.
197  *
198  * Returns:
199  *
200  *             0        empty
201  *      -ENOTDIR        not a directory object
202  *    -ENOTEMPTY        not empty
203  *           -ve        other error
204  *
205  */
206 static int mdd_dir_is_empty(const struct lu_env *env,
207                             struct mdd_object *dir)
208 {
209         struct dt_it     *it;
210         struct dt_object *obj;
211         struct dt_it_ops *iops;
212         int result;
213         ENTRY;
214
215         obj = mdd_object_child(dir);
216         if (!dt_try_as_dir(env, obj))
217                 RETURN(-ENOTDIR);
218
219         iops = &obj->do_index_ops->dio_it;
220         it = iops->init(env, obj, 0, BYPASS_CAPA);
221         if (it != NULL) {
222                 result = iops->get(env, it, (const void *)"");
223                 if (result > 0) {
224                         int i;
225                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
226                                 result = iops->next(env, it);
227                         if (result == 0)
228                                 result = -ENOTEMPTY;
229                         else if (result == +1)
230                                 result = 0;
231                 } else if (result == 0)
232                         /*
233                          * Huh? Index contains no zero key?
234                          */
235                         result = -EIO;
236
237                 iops->put(env, it);
238                 iops->fini(env, it);
239         } else
240                 result = -ENOMEM;
241         RETURN(result);
242 }
243
244 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
245 {
246         struct mdd_device *m = mdd_obj2mdd_dev(obj);
247         struct lu_attr *la = &mdd_env_info(env)->mti_la;
248         int rc;
249         ENTRY;
250
251         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
252         if (rc)
253                 RETURN(rc);
254
255         /*
256          * Subdir count limitation can be broken through.
257          */ 
258         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
259             !S_ISDIR(la->la_mode))
260                 RETURN(-EMLINK);
261         else
262                 RETURN(0);
263 }
264
265 /*
266  * Check whether it may create the cobj under the pobj.
267  * cobj maybe NULL
268  */
269 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
270                    struct mdd_object *cobj, int check_perm, int check_nlink)
271 {
272         int rc = 0;
273         ENTRY;
274
275         if (cobj && mdd_object_exists(cobj))
276                 RETURN(-EEXIST);
277
278         if (mdd_is_dead_obj(pobj))
279                 RETURN(-ENOENT);
280
281         if (check_perm)
282                 rc = mdd_permission_internal_locked(env, pobj, NULL,
283                                                     MAY_WRITE | MAY_EXEC);
284
285         if (!rc && check_nlink)
286                 rc = __mdd_may_link(env, pobj);
287
288         RETURN(rc);
289 }
290
291 /*
292  * Check whether can unlink from the pobj in the case of "cobj == NULL".
293  */
294 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
295                    const struct md_attr *ma)
296 {
297         int rc;
298         ENTRY;
299
300         if (mdd_is_dead_obj(pobj))
301                 RETURN(-ENOENT);
302
303         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
304             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
305                 RETURN(-EPERM);
306
307         rc = mdd_permission_internal_locked(env, pobj, NULL,
308                                             MAY_WRITE | MAY_EXEC);
309         if (rc)
310                 RETURN(rc);
311
312         if (mdd_is_append(pobj))
313                 RETURN(-EPERM);
314
315         RETURN(rc);
316 }
317
318 /*
319  * pobj == NULL is remote ops case, under such case, pobj's
320  * VTX feature has been checked already, no need check again.
321  */
322 static inline int mdd_is_sticky(const struct lu_env *env,
323                                 struct mdd_object *pobj,
324                                 struct mdd_object *cobj)
325 {
326         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
327         struct md_ucred *uc = md_ucred(env);
328         int rc;
329
330         if (pobj) {
331                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
332                 if (rc)
333                         return rc;
334         
335                 if (!(tmp_la->la_mode & S_ISVTX) ||
336                      (tmp_la->la_uid == uc->mu_fsuid))
337                         return 0;
338         }
339
340         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
341         if (rc) 
342                 return rc;
343         
344         if (tmp_la->la_uid == uc->mu_fsuid)
345                 return 0;
346         
347         return !mdd_capable(uc, CAP_FOWNER);
348 }
349
350 /*
351  * Check whether it may delete the cobj from the pobj.
352  * pobj maybe NULL
353  */
354 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
355                    struct mdd_object *cobj, struct md_attr *ma,
356                    int check_perm, int check_empty)
357 {
358         int rc = 0;
359         ENTRY;
360
361         LASSERT(cobj);
362         if (!mdd_object_exists(cobj))
363                 RETURN(-ENOENT);
364
365         if (pobj) {
366                 if (mdd_is_dead_obj(pobj))
367                         RETURN(-ENOENT);
368
369                 if (check_perm) {
370                         rc = mdd_permission_internal_locked(env, pobj, NULL,
371                                                     MAY_WRITE | MAY_EXEC);
372                         if (rc)
373                                 RETURN(rc);
374                 }
375
376                 if (mdd_is_append(pobj))
377                         RETURN(-EPERM);
378         }
379
380         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
381             mdd_is_sticky(env, pobj, cobj))
382                 RETURN(-EPERM);
383
384         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
385                 RETURN(-EPERM);
386
387         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
388             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
389                 RETURN(-EPERM);
390
391         if (S_ISDIR(ma->ma_attr.la_mode)) {
392                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
393
394                 if (!S_ISDIR(mdd_object_type(cobj)))
395                         RETURN(-ENOTDIR);
396
397                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
398                         RETURN(-EBUSY);
399         } else if (S_ISDIR(mdd_object_type(cobj)))
400                 RETURN(-EISDIR);
401
402         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
403                 rc = mdd_dir_is_empty(env, cobj);
404
405         RETURN(rc);
406 }
407
408 /*
409  * tgt maybe NULL
410  * has mdd_write_lock on src already, but not on tgt yet
411  */
412 int mdd_link_sanity_check(const struct lu_env *env,
413                           struct mdd_object *tgt_obj,
414                           const struct lu_name *lname,
415                           struct mdd_object *src_obj)
416 {
417         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
418         int rc = 0;
419         ENTRY;
420
421         /* Local ops, no lookup before link, check filename length here. */
422         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
423                 RETURN(-ENAMETOOLONG);
424
425         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
426                 RETURN(-EPERM);
427
428         if (S_ISDIR(mdd_object_type(src_obj)))
429                 RETURN(-EPERM);
430
431         LASSERT(src_obj != tgt_obj);
432         if (tgt_obj) {
433                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
434                 if (rc)
435                         RETURN(rc);
436         }
437
438         rc = __mdd_may_link(env, src_obj);
439
440         RETURN(rc);
441 }
442
443 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
444                                    const struct lu_fid *fid)
445 {
446         struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
447
448         fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
449         return (const struct dt_rec *)pack;
450 }
451
452 /**
453  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
454  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
455  * (maybe too large to be represented). It is a trick to break through the
456  * "i_nlink" limitation for subdir count.
457  */
458 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
459                    struct thandle *handle)
460 {
461         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
462         struct mdd_device *m = mdd_obj2mdd_dev(obj);
463
464         if (!mdd_is_mnlink(obj)) {
465                 if (S_ISDIR(mdd_object_type(obj))) {
466                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
467                                 return;
468
469                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
470                                 obj->mod_flags |= MNLINK_OBJ;
471                                 tmp_la->la_nlink = 1;
472                                 tmp_la->la_valid = LA_NLINK;
473                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
474                                                       0);
475                                 return;
476                         }
477                 }
478                 mdo_ref_add(env, obj, handle);
479         }
480 }
481
482 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
483                    struct thandle *handle, int is_dot)
484 {
485         if (!mdd_is_mnlink(obj) || is_dot)
486                 mdo_ref_del(env, obj, handle);
487 }
488
489 /* insert named index, add reference if isdir */
490 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
491                               const struct lu_fid *lf, const char *name, int is_dir,
492                               struct thandle *handle, struct lustre_capa *capa)
493 {
494         struct dt_object *next = mdd_object_child(pobj);
495         int               rc;
496         ENTRY;
497
498         if (dt_try_as_dir(env, next)) {
499                 rc = next->do_index_ops->dio_insert(env, next,
500                                                     __mdd_fid_rec(env, lf),
501                                                     (const struct dt_key *)name,
502                                                     handle, capa);
503         } else {
504                 rc = -ENOTDIR;
505         }
506
507         if (rc == 0) {
508                 if (is_dir) {
509                         mdd_write_lock(env, pobj);
510                         __mdd_ref_add(env, pobj, handle);
511                         mdd_write_unlock(env, pobj);
512                 }
513         }
514         RETURN(rc);
515 }
516
517 /* delete named index, drop reference if isdir */
518 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
519                               const char *name, int is_dir, struct thandle *handle,
520                               struct lustre_capa *capa)
521 {
522         struct dt_object *next = mdd_object_child(pobj);
523         int               rc;
524         ENTRY;
525
526         if (dt_try_as_dir(env, next)) {
527                 rc = next->do_index_ops->dio_delete(env, next,
528                                                     (struct dt_key *)name,
529                                                     handle, capa);
530                 if (rc == 0 && is_dir) {
531                         int is_dot = 0;
532
533                         if (name != NULL && name[0] == '.' && name[1] == 0)
534                                 is_dot = 1;
535                         mdd_write_lock(env, pobj);
536                         __mdd_ref_del(env, pobj, handle, is_dot);
537                         mdd_write_unlock(env, pobj);
538                 }
539         } else
540                 rc = -ENOTDIR;
541
542         RETURN(rc);
543 }
544
545 static int
546 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
547                         const struct lu_fid *lf, const char *name,
548                         struct thandle *handle, struct lustre_capa *capa)
549 {
550         struct dt_object *next = mdd_object_child(pobj);
551         int               rc;
552         ENTRY;
553
554         if (dt_try_as_dir(env, next)) {
555                 rc = next->do_index_ops->dio_insert(env, next,
556                                                     __mdd_fid_rec(env, lf),
557                                                     (const struct dt_key *)name,
558                                                     handle, capa);
559         } else {
560                 rc = -ENOTDIR;
561         }
562         RETURN(rc);
563 }
564
565 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
566                     struct md_object *src_obj, const struct lu_name *lname,
567                     struct md_attr *ma)
568 {
569         char *name = lname->ln_name;
570         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
571         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
572         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
573         struct mdd_device *mdd = mdo2mdd(src_obj);
574         struct dynlock_handle *dlh;
575         struct thandle *handle;
576         int rc;
577         ENTRY;
578
579         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
580         handle = mdd_trans_start(env, mdd);
581         if (IS_ERR(handle))
582                 RETURN(PTR_ERR(handle));
583
584         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
585         if (dlh == NULL)
586                 GOTO(out_trans, rc = -ENOMEM);
587         mdd_write_lock(env, mdd_sobj);
588
589         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
590         if (rc)
591                 GOTO(out_unlock, rc);
592
593         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
594                                      name, handle,
595                                      mdd_object_capa(env, mdd_tobj));
596         if (rc)
597                 GOTO(out_unlock, rc);
598
599         __mdd_ref_add(env, mdd_sobj, handle);
600
601         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
602         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
603
604         la->la_valid = LA_CTIME | LA_MTIME;
605         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
606         if (rc)
607                 GOTO(out_unlock, rc);
608
609         la->la_valid = LA_CTIME;
610         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
611         EXIT;
612 out_unlock:
613         mdd_write_unlock(env, mdd_sobj);
614         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
615 out_trans:
616         mdd_trans_stop(env, mdd, rc, handle);
617         return rc;
618 }
619
620 /* caller should take a lock before calling */
621 int mdd_finish_unlink(const struct lu_env *env,
622                       struct mdd_object *obj, struct md_attr *ma,
623                       struct thandle *th)
624 {
625         int rc;
626         ENTRY;
627
628         rc = mdd_iattr_get(env, obj, ma);
629         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
630                 /* add new orphan and the object
631                  * will be deleted during the object_put() */
632                 if (__mdd_orphan_add(env, obj, th) == 0)
633                         obj->mod_flags |= ORPHAN_OBJ;
634
635                 obj->mod_flags |= DEAD_OBJ;
636                 if (obj->mod_count == 0)
637                         rc = mdd_object_kill(env, obj, ma);
638                 else
639                         /* clear MA_LOV | MA_COOKIE, if we do not
640                          * unlink it in case we get it somewhere */
641                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
642         } else
643                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
644
645         RETURN(rc);
646 }
647
648 /*
649  * pobj maybe NULL
650  * has mdd_write_lock on cobj already, but not on pobj yet
651  */
652 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
653                             struct mdd_object *cobj, struct md_attr *ma)
654 {
655         int rc;
656         ENTRY;
657
658         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
659
660         RETURN(rc);
661 }
662
663 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
664                       struct md_object *cobj, const struct lu_name *lname,
665                       struct md_attr *ma)
666 {
667         char *name = lname->ln_name;
668         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
669         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
670         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
671         struct mdd_device *mdd = mdo2mdd(pobj);
672         struct dynlock_handle *dlh;
673         struct thandle    *handle;
674         int rc, is_dir;
675         ENTRY;
676
677         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
678                  PFID(mdd_object_fid(mdd_cobj)));
679
680         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
681         if (rc)
682                 RETURN(rc);
683
684         handle = mdd_trans_start(env, mdd);
685         if (IS_ERR(handle))
686                 RETURN(PTR_ERR(handle));
687
688
689         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
690         if (dlh == NULL)
691                 GOTO(out_trans, rc = -ENOMEM);
692         mdd_write_lock(env, mdd_cobj);
693
694         is_dir = S_ISDIR(ma->ma_attr.la_mode);
695         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
696         if (rc)
697                 GOTO(cleanup, rc);
698
699         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
700                                 mdd_object_capa(env, mdd_pobj));
701         if (rc)
702                 GOTO(cleanup, rc);
703
704         __mdd_ref_del(env, mdd_cobj, handle, 0);
705         if (is_dir)
706                 /* unlink dot */
707                 __mdd_ref_del(env, mdd_cobj, handle, 1);
708
709         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
710         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
711
712         la->la_valid = LA_CTIME | LA_MTIME;
713         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
714         if (rc)
715                 GOTO(cleanup, rc);
716
717         la->la_valid = LA_CTIME;
718         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
719         if (rc)
720                 GOTO(cleanup, rc);
721
722         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
723
724         if (rc == 0)
725                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
726                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
727                                    NULL, NULL);
728         EXIT;
729 cleanup:
730         mdd_write_unlock(env, mdd_cobj);
731         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
732 out_trans:
733         mdd_trans_stop(env, mdd, rc, handle);
734         return rc;
735 }
736
737 /* has not lock on pobj yet */
738 static int mdd_ni_sanity_check(const struct lu_env *env,
739                                struct md_object *pobj,
740                                const struct md_attr *ma)
741 {
742         struct mdd_object *obj = md2mdd_obj(pobj);
743         int rc;
744         ENTRY;
745
746         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
747                 RETURN(0);
748
749         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
750
751         RETURN(rc);
752 }
753
754 /*
755  * Partial operation.
756  */
757 static int mdd_name_insert(const struct lu_env *env,
758                            struct md_object *pobj,
759                            const struct lu_name *lname,
760                            const struct lu_fid *fid,
761                            const struct md_attr *ma)
762 {
763         char *name = lname->ln_name;
764         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
765         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
766         struct mdd_device *mdd = mdo2mdd(pobj);
767         struct dynlock_handle *dlh;
768         struct thandle *handle;
769         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
770         int rc;
771         ENTRY;
772
773         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
774         handle = mdd_trans_start(env, mdo2mdd(pobj));
775         if (IS_ERR(handle))
776                 RETURN(PTR_ERR(handle));
777
778         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
779         if (dlh == NULL)
780                 GOTO(out_trans, rc = -ENOMEM);
781
782         rc = mdd_ni_sanity_check(env, pobj, ma);
783         if (rc)
784                 GOTO(out_unlock, rc);
785
786         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
787                                 handle, BYPASS_CAPA);
788         if (rc)
789                 GOTO(out_unlock, rc);
790
791         /*
792          * For some case, no need update obj's ctime (LA_CTIME is not set),
793          * e.g. split_dir.
794          * For other cases, update obj's ctime (LA_CTIME is set),
795          * e.g. cmr_link.
796          */
797         if (ma->ma_attr.la_valid & LA_CTIME) {
798                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
799                 la->la_valid = LA_CTIME | LA_MTIME;
800                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
801                                                         handle, 0);
802         }
803         EXIT;
804 out_unlock:
805         mdd_pdo_write_unlock(env, mdd_obj, dlh);
806 out_trans:
807         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
808         return rc;
809 }
810
811 /* has not lock on pobj yet */
812 static int mdd_nr_sanity_check(const struct lu_env *env,
813                                struct md_object *pobj,
814                                const struct md_attr *ma)
815 {
816         struct mdd_object *obj = md2mdd_obj(pobj);
817         int rc;
818         ENTRY;
819
820         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
821                 RETURN(0);
822
823         rc = mdd_may_unlink(env, obj, ma);
824
825         RETURN(rc);
826 }
827
828 /*
829  * Partial operation.
830  */
831 static int mdd_name_remove(const struct lu_env *env,
832                            struct md_object *pobj,
833                            const struct lu_name *lname,
834                            const struct md_attr *ma)
835 {
836         char *name = lname->ln_name;
837         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
838         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
839         struct mdd_device *mdd = mdo2mdd(pobj);
840         struct dynlock_handle *dlh;
841         struct thandle *handle;
842         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
843         int rc;
844         ENTRY;
845
846         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
847         handle = mdd_trans_start(env, mdd);
848         if (IS_ERR(handle))
849                 RETURN(PTR_ERR(handle));
850
851         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
852         if (dlh == NULL)
853                 GOTO(out_trans, rc = -ENOMEM);
854
855         rc = mdd_nr_sanity_check(env, pobj, ma);
856         if (rc)
857                 GOTO(out_unlock, rc);
858
859         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
860                                 handle, BYPASS_CAPA);
861         if (rc)
862                 GOTO(out_unlock, rc);
863
864         /*
865          * For some case, no need update obj's ctime (LA_CTIME is not set),
866          * e.g. split_dir.
867          * For other cases, update obj's ctime (LA_CTIME is set),
868          * e.g. cmr_unlink.
869          */
870         if (ma->ma_attr.la_valid & LA_CTIME) {
871                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
872                 la->la_valid = LA_CTIME | LA_MTIME;
873                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
874                                                         handle, 0);
875         }
876         EXIT;
877 out_unlock:
878         mdd_pdo_write_unlock(env, mdd_obj, dlh);
879 out_trans:
880         mdd_trans_stop(env, mdd, rc, handle);
881         return rc;
882 }
883
884 /*
885  * tobj maybe NULL
886  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
887  */
888 static int mdd_rt_sanity_check(const struct lu_env *env,
889                                struct mdd_object *tgt_pobj,
890                                struct mdd_object *tobj,
891                                struct md_attr *ma)
892 {
893         int rc;
894         ENTRY;
895
896         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
897                 RETURN(0);
898
899         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
900          * exist. In fact, tobj must exist, otherwise the call trace will be:
901          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
902          * When get here, tobj must be NOT NULL, the other case has been
903          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
904          * MDS_PERM_BYPASS.
905          * So check may_delete, but not check nlink of tgt_pobj. */
906         LASSERT(tobj);
907         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
908
909         RETURN(rc);
910 }
911
912 static int mdd_rename_tgt(const struct lu_env *env,
913                           struct md_object *pobj, struct md_object *tobj,
914                           const struct lu_fid *lf, const struct lu_name *lname,
915                           struct md_attr *ma)
916 {
917         char *name = lname->ln_name;
918         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
919         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
920         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
921         struct mdd_device *mdd = mdo2mdd(pobj);
922         struct dynlock_handle *dlh;
923         struct thandle *handle;
924         int rc;
925         ENTRY;
926
927         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
928         handle = mdd_trans_start(env, mdd);
929         if (IS_ERR(handle))
930                 RETURN(PTR_ERR(handle));
931
932         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
933         if (dlh == NULL)
934                 GOTO(out_trans, rc = -ENOMEM);
935         if (tobj)
936                 mdd_write_lock(env, mdd_tobj);
937
938         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
939         if (rc)
940                 GOTO(cleanup, rc);
941
942         /*
943          * If rename_tgt is called then we should just re-insert name with
944          * correct fid, no need to dec/inc parent nlink if obj is dir.
945          */
946         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
947         if (rc)
948                 GOTO(cleanup, rc);
949
950         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
951                                      BYPASS_CAPA);
952         if (rc)
953                 GOTO(cleanup, rc);
954
955         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
956         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
957
958         la->la_valid = LA_CTIME | LA_MTIME;
959         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
960         if (rc)
961                 GOTO(cleanup, rc);
962
963         /* 
964          * For tobj is remote case cmm layer has processed
965          * and pass NULL tobj to here. So when tobj is NOT NULL,
966          * it must be local one.
967          */
968         if (tobj && mdd_object_exists(mdd_tobj)) {
969                 __mdd_ref_del(env, mdd_tobj, handle, 0);
970
971                 /* Remove dot reference. */
972                 if (S_ISDIR(ma->ma_attr.la_mode))
973                         __mdd_ref_del(env, mdd_tobj, handle, 1);
974
975                 la->la_valid = LA_CTIME;
976                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
977                 if (rc)
978                         GOTO(cleanup, rc);
979
980                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
981                 if (rc)
982                         GOTO(cleanup, rc);
983         }
984         EXIT;
985 cleanup:
986         if (tobj)
987                 mdd_write_unlock(env, mdd_tobj);
988         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
989 out_trans:
990         mdd_trans_stop(env, mdd, rc, handle);
991         return rc;
992 }
993
994 /*
995  * The permission has been checked when obj created, no need check again.
996  */
997 static int mdd_cd_sanity_check(const struct lu_env *env,
998                                struct mdd_object *obj)
999 {
1000         ENTRY;
1001
1002         /* EEXIST check */
1003         if (!obj || mdd_is_dead_obj(obj))
1004                 RETURN(-ENOENT);
1005
1006         RETURN(0);
1007
1008 }
1009
1010 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1011                            struct md_object *cobj, const struct md_op_spec *spec,
1012                            struct md_attr *ma)
1013 {
1014         struct mdd_device *mdd = mdo2mdd(cobj);
1015         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1016         struct mdd_object *son = md2mdd_obj(cobj);
1017         struct lu_attr    *attr = &ma->ma_attr;
1018         struct lov_mds_md *lmm = NULL;
1019         int                lmm_size = 0;
1020         struct thandle    *handle;
1021         int                rc;
1022         ENTRY;
1023
1024         rc = mdd_cd_sanity_check(env, son);
1025         if (rc)
1026                 RETURN(rc);
1027
1028         if (!md_should_create(spec->sp_cr_flags))
1029                 RETURN(0);
1030
1031         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1032                             spec, attr);
1033         if (rc)
1034                 RETURN(rc);
1035
1036         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1037         handle = mdd_trans_start(env, mdd);
1038         if (IS_ERR(handle))
1039                 GOTO(out_free, rc = PTR_ERR(handle));
1040
1041         /*
1042          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1043          * Should this be fixed?
1044          */
1045
1046         /* Replay creates has objects already */
1047 #if 0
1048         if (spec->u.sp_ea.no_lov_create) {
1049                 CDEBUG(D_INFO, "we already have lov ea\n");
1050                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1051                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1052                                     spec->u.sp_ea.eadatalen, handle, 0);
1053         } else
1054 #endif
1055                 /* No need mdd_lsm_sanity_check here */
1056                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1057                                     lmm_size, handle, 0);
1058
1059         if (rc == 0)
1060                rc = mdd_attr_get_internal_locked(env, son, ma);
1061
1062         /* update lov_objid data, must be before transaction stop! */
1063         if (rc == 0)
1064                 mdd_lov_objid_update(mdd, lmm);
1065
1066         mdd_trans_stop(env, mdd, rc, handle);
1067 out_free:
1068         /* Finish mdd_lov_create() stuff. */
1069         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1070         RETURN(rc);
1071 }
1072
1073 static int
1074 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1075              const struct lu_name *lname, struct lu_fid* fid, int mask)
1076 {
1077         char                *name = lname->ln_name;
1078         const struct dt_key *key = (const struct dt_key *)name;
1079         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1080         struct mdd_device   *m = mdo2mdd(pobj);
1081         struct dt_object    *dir = mdd_object_child(mdd_obj);
1082         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1083         int rc;
1084         ENTRY;
1085
1086         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1087                 RETURN(-ESTALE);
1088
1089         rc = mdd_object_exists(mdd_obj);
1090         if (unlikely(rc == 0))
1091                 RETURN(-ESTALE);
1092         else if (unlikely(rc < 0)) {
1093                 CERROR("Object "DFID" locates on remote server\n",
1094                         PFID(mdo2fid(mdd_obj)));
1095                 LBUG();
1096         }
1097
1098         /* The common filename length check. */
1099         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1100                 RETURN(-ENAMETOOLONG);
1101
1102         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
1103         if (rc)
1104                 RETURN(rc);
1105
1106         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1107                    dt_try_as_dir(env, dir))) {
1108                 rc = dir->do_index_ops->dio_lookup(env, dir,
1109                                                  (struct dt_rec *)pack, key,
1110                                                  mdd_object_capa(env, mdd_obj));
1111                 if (rc == 0)
1112                         rc = fid_unpack(pack, fid);
1113         } else
1114                 rc = -ENOTDIR;
1115
1116         RETURN(rc);
1117 }
1118
1119 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1120                           struct mdd_object *child, struct md_attr *ma,
1121                           struct thandle *handle)
1122 {
1123         int rc;
1124         ENTRY;
1125
1126         /*
1127          * Update attributes for child.
1128          *
1129          * FIXME:
1130          *  (1) the valid bits should be converted between Lustre and Linux;
1131          *  (2) maybe, the child attributes should be set in OSD when creation.
1132          */
1133
1134         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1135         if (rc != 0)
1136                 RETURN(rc);
1137
1138         if (S_ISDIR(ma->ma_attr.la_mode)) {
1139                 /* Add "." and ".." for newly created dir */
1140                 __mdd_ref_add(env, child, handle);
1141                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1142                                              dot, handle, BYPASS_CAPA);
1143                 if (rc == 0) {
1144                         rc = __mdd_index_insert_only(env, child, pfid,
1145                                                      dotdot, handle,
1146                                                      BYPASS_CAPA);
1147                         if (rc != 0) {
1148                                 int rc2;
1149
1150                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1151                                                          handle, BYPASS_CAPA);
1152                                 if (rc2 != 0)
1153                                         CERROR("Failure to cleanup after dotdot"
1154                                                " creation: %d (%d)\n", rc2, rc);
1155                         }
1156                 }
1157         }
1158         RETURN(rc);
1159 }
1160
1161 /* has not lock on pobj yet */
1162 static int mdd_create_sanity_check(const struct lu_env *env,
1163                                    struct md_object *pobj,
1164                                    const struct lu_name *lname,
1165                                    struct md_attr *ma,
1166                                    struct md_op_spec *spec)
1167 {
1168         struct mdd_thread_info *info = mdd_env_info(env);
1169         struct lu_attr    *la        = &info->mti_la;
1170         struct lu_fid     *fid       = &info->mti_fid;
1171         struct mdd_object *obj       = md2mdd_obj(pobj);
1172         struct mdd_device *m         = mdo2mdd(pobj);
1173         int lookup                   = spec->sp_cr_lookup;
1174         int rc;
1175         ENTRY;
1176
1177         /* EEXIST check */
1178         if (mdd_is_dead_obj(obj))
1179                 RETURN(-ENOENT);
1180
1181         /*
1182          * In some cases this lookup is not needed - we know before if name
1183          * exists or not because MDT performs lookup for it.
1184          * name length check is done in lookup.
1185          */
1186         if (lookup) {
1187                 /*
1188                  * Check if the name already exist, though it will be checked in
1189                  * _index_insert also, for avoiding rolling back if exists
1190                  * _index_insert.
1191                  */
1192                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1193                                          MAY_WRITE | MAY_EXEC);
1194                 if (rc != -ENOENT)
1195                         RETURN(rc ? : -EEXIST);
1196         } else {
1197                 /*
1198                  * Check WRITE permission for the parent.
1199                  * EXEC permission have been checked
1200                  * when lookup before create already.
1201                  */
1202                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1203                 if (rc)
1204                         RETURN(rc);
1205         }
1206
1207         /* sgid check */
1208         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1209         if (rc != 0)
1210                 RETURN(rc);
1211
1212         if (la->la_mode & S_ISGID) {
1213                 ma->ma_attr.la_gid = la->la_gid;
1214                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1215                         ma->ma_attr.la_mode |= S_ISGID;
1216                         ma->ma_attr.la_valid |= LA_MODE;
1217                 }
1218         }
1219
1220         switch (ma->ma_attr.la_mode & S_IFMT) {
1221         case S_IFLNK: {
1222                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1223
1224                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1225                         RETURN(-ENAMETOOLONG);
1226                 else
1227                         RETURN(0);
1228         }
1229         case S_IFDIR:
1230         case S_IFREG:
1231         case S_IFCHR:
1232         case S_IFBLK:
1233         case S_IFIFO:
1234         case S_IFSOCK:
1235                 rc = 0;
1236                 break;
1237         default:
1238                 rc = -EINVAL;
1239                 break;
1240         }
1241         RETURN(rc);
1242 }
1243
1244 /*
1245  * Create object and insert it into namespace.
1246  */
1247 static int mdd_create(const struct lu_env *env,
1248                       struct md_object *pobj,
1249                       const struct lu_name *lname,
1250                       struct md_object *child,
1251                       struct md_op_spec *spec,
1252                       struct md_attr* ma)
1253 {
1254         char *name = lname->ln_name;
1255         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1256         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1257         struct mdd_object *son = md2mdd_obj(child);
1258         struct mdd_device *mdd = mdo2mdd(pobj);
1259         struct lu_attr    *attr = &ma->ma_attr;
1260         struct lov_mds_md *lmm = NULL;
1261         struct thandle    *handle;
1262         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1263         struct dynlock_handle *dlh;
1264         ENTRY;
1265
1266         /*
1267          * Two operations have to be performed:
1268          *
1269          *  - allocation of new object (->do_create()), and
1270          *
1271          *  - insertion into parent index (->dio_insert()).
1272          *
1273          * Due to locking, operation order is not important, when both are
1274          * successful, *but* error handling cases are quite different:
1275          *
1276          *  - if insertion is done first, and following object creation fails,
1277          *  insertion has to be rolled back, but this operation might fail
1278          *  also leaving us with dangling index entry.
1279          *
1280          *  - if creation is done first, is has to be undone if insertion
1281          *  fails, leaving us with leaked space, which is neither good, nor
1282          *  fatal.
1283          *
1284          * It seems that creation-first is simplest solution, but it is
1285          * sub-optimal in the frequent
1286          *
1287          *         $ mkdir foo
1288          *         $ mkdir foo
1289          *
1290          * case, because second mkdir is bound to create object, only to
1291          * destroy it immediately.
1292          *
1293          * To avoid this follow local file systems that do double lookup:
1294          *
1295          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1296          *
1297          *     1. create            (mdd_object_create_internal())
1298          *
1299          *     2. insert            (__mdd_index_insert(), lookup again)
1300          */
1301
1302         /* Sanity checks before big job. */
1303         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1304         if (rc)
1305                 RETURN(rc);
1306
1307         /*
1308          * No RPC inside the transaction, so OST objects should be created at
1309          * first.
1310          */
1311         if (S_ISREG(attr->la_mode)) {
1312                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1313                                     spec, attr);
1314                 if (rc)
1315                         RETURN(rc);
1316         }
1317
1318         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1319         handle = mdd_trans_start(env, mdd);
1320         if (IS_ERR(handle))
1321                 GOTO(out_free, rc = PTR_ERR(handle));
1322
1323         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1324         if (dlh == NULL)
1325                 GOTO(out_trans, rc = -ENOMEM);
1326
1327         /*
1328          * XXX: Check that link can be added to the parent in mkdir case.
1329          */
1330
1331         mdd_write_lock(env, son);
1332         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
1333         if (rc) {
1334                 mdd_write_unlock(env, son);
1335                 GOTO(cleanup, rc);
1336         }
1337
1338         created = 1;
1339
1340 #ifdef CONFIG_FS_POSIX_ACL
1341         mdd_read_lock(env, mdd_pobj);
1342         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1343         mdd_read_unlock(env, mdd_pobj);
1344         if (rc) {
1345                 mdd_write_unlock(env, son);
1346                 GOTO(cleanup, rc);
1347         } else {
1348                 ma->ma_attr.la_valid |= LA_MODE;
1349         }
1350 #endif
1351
1352         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1353                                    son, ma, handle);
1354         mdd_write_unlock(env, son);
1355         if (rc)
1356                 /*
1357                  * Object has no links, so it will be destroyed when last
1358                  * reference is released. (XXX not now.)
1359                  */
1360                 GOTO(cleanup, rc);
1361
1362         initialized = 1;
1363
1364         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1365                                 name, S_ISDIR(attr->la_mode), handle,
1366                                 mdd_object_capa(env, mdd_pobj));
1367
1368         if (rc)
1369                 GOTO(cleanup, rc);
1370
1371         inserted = 1;
1372
1373         /* No need mdd_lsm_sanity_check here */
1374         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1375         if (rc) {
1376                 CERROR("error on stripe info copy %d \n", rc);
1377                 GOTO(cleanup, rc);
1378         }
1379         if (lmm && lmm_size > 0) {
1380                 /* Set Lov here, do not get lmm again later */
1381                 memcpy(ma->ma_lmm, lmm, lmm_size);
1382                 ma->ma_lmm_size = lmm_size;
1383                 ma->ma_valid |= MA_LOV;
1384         }
1385
1386         if (S_ISLNK(attr->la_mode)) {
1387                 struct dt_object *dt = mdd_object_child(son);
1388                 const char *target_name = spec->u.sp_symname;
1389                 int sym_len = strlen(target_name);
1390                 const struct lu_buf *buf;
1391                 loff_t pos = 0;
1392
1393                 buf = mdd_buf_get_const(env, target_name, sym_len);
1394                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1395                                                 mdd_object_capa(env, son));
1396
1397                 if (rc == sym_len)
1398                         rc = 0;
1399                 else
1400                         GOTO(cleanup, rc = -EFAULT);
1401         }
1402
1403         *la = ma->ma_attr;
1404         la->la_valid = LA_CTIME | LA_MTIME;
1405         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1406         if (rc)
1407                 GOTO(cleanup, rc);
1408
1409         /* Return attr back. */
1410         rc = mdd_attr_get_internal_locked(env, son, ma);
1411         EXIT;
1412 cleanup:
1413         if (rc && created) {
1414                 int rc2 = 0;
1415
1416                 if (inserted) {
1417                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1418                                                  S_ISDIR(attr->la_mode),
1419                                                  handle, BYPASS_CAPA);
1420                         if (rc2)
1421                                 CERROR("error can not cleanup destroy %d\n",
1422                                        rc2);
1423                 }
1424
1425                 if (rc2 == 0) {
1426                         mdd_write_lock(env, son);
1427                         __mdd_ref_del(env, son, handle, 0);
1428                         if (initialized && S_ISDIR(attr->la_mode))
1429                                 __mdd_ref_del(env, son, handle, 1);
1430                         mdd_write_unlock(env, son);
1431                 }
1432         }
1433
1434         /* update lov_objid data, must be before transaction stop! */
1435         if (rc == 0)
1436                 mdd_lov_objid_update(mdd, lmm);
1437
1438         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1439 out_trans:
1440         mdd_trans_stop(env, mdd, rc, handle);
1441 out_free:
1442         /* finis lov_create stuff, free all temporary data */
1443         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1444         return rc;
1445 }
1446
1447 /*
1448  * Get locks on parents in proper order
1449  * RETURN: < 0 - error, rename_order if successful
1450  */
1451 enum rename_order {
1452         MDD_RN_SAME,
1453         MDD_RN_SRCTGT,
1454         MDD_RN_TGTSRC
1455 };
1456
1457 static int mdd_rename_order(const struct lu_env *env,
1458                             struct mdd_device *mdd,
1459                             struct mdd_object *src_pobj,
1460                             struct mdd_object *tgt_pobj)
1461 {
1462         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1463         int rc;
1464         ENTRY;
1465
1466         if (src_pobj == tgt_pobj)
1467                 RETURN(MDD_RN_SAME);
1468
1469         /* compared the parent child relationship of src_p&tgt_p */
1470         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1471                 rc = MDD_RN_SRCTGT;
1472         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1473                 rc = MDD_RN_TGTSRC;
1474         } else {
1475                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1476                 if (rc == -EREMOTE)
1477                         rc = 0;
1478
1479                 if (rc == 1)
1480                         rc = MDD_RN_TGTSRC;
1481                 else if (rc == 0)
1482                         rc = MDD_RN_SRCTGT;
1483         }
1484
1485         RETURN(rc);
1486 }
1487
1488 /* has not mdd_write{read}_lock on any obj yet. */
1489 static int mdd_rename_sanity_check(const struct lu_env *env,
1490                                    struct mdd_object *src_pobj,
1491                                    struct mdd_object *tgt_pobj,
1492                                    struct mdd_object *sobj,
1493                                    struct mdd_object *tobj,
1494                                    struct md_attr *ma)
1495 {
1496         int rc = 0;
1497         ENTRY;
1498
1499         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1500                 RETURN(0);
1501
1502         /* XXX: when get here, sobj must NOT be NULL,
1503          * the other case has been processed in cml_rename
1504          * before mdd_rename and enable MDS_PERM_BYPASS. */
1505         LASSERT(sobj);
1506         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1507         if (rc)
1508                 RETURN(rc);
1509
1510         /* XXX: when get here, "tobj == NULL" means tobj must
1511          * NOT exist (neither on remote MDS, such case has been
1512          * processed in cml_rename before mdd_rename and enable
1513          * MDS_PERM_BYPASS).
1514          * So check may_create, but not check may_unlink. */
1515         if (!tobj)
1516                 rc = mdd_may_create(env, tgt_pobj, NULL,
1517                                     (src_pobj != tgt_pobj), 0);
1518         else
1519                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1520                                     (src_pobj != tgt_pobj), 1);
1521
1522         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1523             S_ISDIR(ma->ma_attr.la_mode))
1524                 rc = __mdd_may_link(env, tgt_pobj);
1525
1526         RETURN(rc);
1527 }
1528
1529 /* src object can be remote that is why we use only fid and type of object */
1530 static int mdd_rename(const struct lu_env *env,
1531                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1532                       const struct lu_fid *lf, const struct lu_name *lsname,
1533                       struct md_object *tobj, const struct lu_name *ltname,
1534                       struct md_attr *ma)
1535 {
1536         char *sname = lsname->ln_name;
1537         char *tname = ltname->ln_name;
1538         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1539         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1540         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1541         struct mdd_device *mdd = mdo2mdd(src_pobj);
1542         struct mdd_object *mdd_sobj = NULL;
1543         struct mdd_object *mdd_tobj = NULL;
1544         struct dynlock_handle *sdlh, *tdlh;
1545         struct thandle *handle;
1546         int is_dir;
1547         int rc;
1548         ENTRY;
1549
1550         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1551         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1552
1553         if (tobj)
1554                 mdd_tobj = md2mdd_obj(tobj);
1555
1556         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1557         handle = mdd_trans_start(env, mdd);
1558         if (IS_ERR(handle))
1559                 RETURN(PTR_ERR(handle));
1560
1561         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1562         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1563         if (rc < 0)
1564                 GOTO(cleanup_unlocked, rc);
1565
1566         /* Get locks in determined order */
1567         if (rc == MDD_RN_SAME) {
1568                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1569                 /* check hashes to determine do we need one lock or two */
1570                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1571                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1572                 else
1573                         tdlh = sdlh;
1574         } else if (rc == MDD_RN_SRCTGT) {
1575                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1576                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1577         } else {
1578                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1579                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1580         }
1581         if (sdlh == NULL || tdlh == NULL)
1582                 GOTO(cleanup, rc = -ENOMEM);
1583
1584         mdd_sobj = mdd_object_find(env, mdd, lf);
1585         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1586                                      mdd_sobj, mdd_tobj, ma);
1587         if (rc)
1588                 GOTO(cleanup, rc);
1589
1590         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1591                                 mdd_object_capa(env, mdd_spobj));
1592         if (rc)
1593                 GOTO(cleanup, rc);
1594
1595         /*
1596          * Here tobj can be remote one, so we do index_delete unconditionally
1597          * and -ENOENT is allowed.
1598          */
1599         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1600                                 mdd_object_capa(env, mdd_tpobj));
1601         if (rc != 0 && rc != -ENOENT)
1602                 GOTO(cleanup, rc);
1603
1604         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1605                                 mdd_object_capa(env, mdd_tpobj));
1606         if (rc)
1607                 GOTO(cleanup, rc);
1608
1609         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1610         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1611
1612         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1613         if (mdd_sobj) {
1614                 la->la_valid = LA_CTIME;
1615                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1616                                                         handle, 0);
1617                 if (rc)
1618                         GOTO(cleanup, rc);
1619         }
1620
1621         /* 
1622          * For tobj is remote case cmm layer has processed
1623          * and set tobj to NULL then. So when tobj is NOT NULL,
1624          * it must be local one.
1625          */
1626         if (tobj && mdd_object_exists(mdd_tobj)) {
1627                 mdd_write_lock(env, mdd_tobj);
1628                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1629
1630                 /* Remove dot reference. */
1631                 if (is_dir)
1632                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1633
1634                 la->la_valid = LA_CTIME;
1635                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1636                 if (rc)
1637                         GOTO(cleanup, rc);
1638
1639                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1640                 mdd_write_unlock(env, mdd_tobj);
1641                 if (rc)
1642                         GOTO(cleanup, rc);
1643         }
1644
1645         la->la_valid = LA_CTIME | LA_MTIME;
1646         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
1647         if (rc)
1648                 GOTO(cleanup, rc);
1649
1650         if (mdd_spobj != mdd_tpobj) {
1651                 la->la_valid = LA_CTIME | LA_MTIME;
1652                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
1653                                                   handle, 0);
1654         }
1655
1656         EXIT;
1657 cleanup:
1658         if (likely(tdlh) && sdlh != tdlh)
1659                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1660         if (likely(sdlh))
1661                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1662 cleanup_unlocked:
1663         mdd_trans_stop(env, mdd, rc, handle);
1664         if (mdd_sobj)
1665                 mdd_object_put(env, mdd_sobj);
1666         return rc;
1667 }
1668
1669 struct md_dir_operations mdd_dir_ops = {
1670         .mdo_is_subdir     = mdd_is_subdir,
1671         .mdo_lookup        = mdd_lookup,
1672         .mdo_create        = mdd_create,
1673         .mdo_rename        = mdd_rename,
1674         .mdo_link          = mdd_link,
1675         .mdo_unlink        = mdd_unlink,
1676         .mdo_name_insert   = mdd_name_insert,
1677         .mdo_name_remove   = mdd_name_remove,
1678         .mdo_rename_tgt    = mdd_rename_tgt,
1679         .mdo_create_data   = mdd_create_data
1680 };