Whamcloud - gitweb
cleanup usage obd_set_info_async, obd_get_info.
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static struct lu_name lname_dotdot = {
52         (char *) dotdot,
53         sizeof(dotdot) - 1
54 };
55
56 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
57                         const struct lu_name *lname, struct lu_fid* fid,
58                         int mask);
59 static int
60 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
61                     const struct lu_name *lname, struct lu_fid* fid, int mask)
62 {
63         char *name = lname->ln_name;
64         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
65         struct dynlock_handle *dlh;
66         int rc;
67
68         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
69         if (unlikely(dlh == NULL))
70                 return -ENOMEM;
71         rc = __mdd_lookup(env, pobj, lname, fid, mask);
72         mdd_pdo_read_unlock(env, mdd_obj, dlh);
73
74         return rc;
75 }
76
77 static int mdd_lookup(const struct lu_env *env,
78                       struct md_object *pobj, const struct lu_name *lname,
79                       struct lu_fid* fid, struct md_op_spec *spec)
80 {
81         int rc;
82         ENTRY;
83         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
84         RETURN(rc);
85 }
86
87
88 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
89                           struct lu_fid *fid)
90 {
91         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
92 }
93
94 /*
95  * For root fid use special function, whcih does not compare version component
96  * of fid. Vresion component is different for root fids on all MDTs.
97  */
98 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
99 {
100         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
101                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
102 }
103
104 /*
105  * return 1: if lf is the fid of the ancestor of p1;
106  * return 0: if not;
107  *
108  * return -EREMOTE: if remote object is found, in this
109  * case fid of remote object is saved to @pf;
110  *
111  * otherwise: values < 0, errors.
112  */
113 static int mdd_is_parent(const struct lu_env *env,
114                          struct mdd_device *mdd,
115                          struct mdd_object *p1,
116                          const struct lu_fid *lf,
117                          struct lu_fid *pf)
118 {
119         struct mdd_object *parent = NULL;
120         struct lu_fid *pfid;
121         int rc;
122         ENTRY;
123
124         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
125         pfid = &mdd_env_info(env)->mti_fid;
126
127         /* Check for root first. */
128         if (mdd_is_root(mdd, mdo2fid(p1)))
129                 RETURN(0);
130
131         for(;;) {
132                 /* this is done recursively, bypass capa for each obj */
133                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
134                 rc = mdd_parent_fid(env, p1, pfid);
135                 if (rc)
136                         GOTO(out, rc);
137                 if (mdd_is_root(mdd, pfid))
138                         GOTO(out, rc = 0);
139                 if (lu_fid_eq(pfid, lf))
140                         GOTO(out, rc = 1);
141                 if (parent)
142                         mdd_object_put(env, parent);
143                 parent = mdd_object_find(env, mdd, pfid);
144
145                 /* cross-ref parent */
146                 if (parent == NULL) {
147                         if (pf != NULL)
148                                 *pf = *pfid;
149                         GOTO(out, rc = -EREMOTE);
150                 } else if (IS_ERR(parent))
151                         GOTO(out, rc = PTR_ERR(parent));
152                 p1 = parent;
153         }
154         EXIT;
155 out:
156         if (parent && !IS_ERR(parent))
157                 mdd_object_put(env, parent);
158         return rc;
159 }
160
161 /*
162  * No permission check is needed.
163  *
164  * returns 1: if fid is ancestor of @mo;
165  * returns 0: if fid is not a ancestor of @mo;
166  *
167  * returns EREMOTE if remote object is found, fid of remote object is saved to
168  * @fid;
169  *
170  * returns < 0: if error
171  */
172 static int mdd_is_subdir(const struct lu_env *env,
173                          struct md_object *mo, const struct lu_fid *fid,
174                          struct lu_fid *sfid)
175 {
176         struct mdd_device *mdd = mdo2mdd(mo);
177         int rc;
178         ENTRY;
179
180         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
181                 RETURN(0);
182
183         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
184         if (rc == 0) {
185                 /* found root */
186                 fid_zero(sfid);
187         } else if (rc == 1) {
188                 /* found @fid is parent */
189                 *sfid = *fid;
190                 rc = 0;
191         }
192         RETURN(rc);
193 }
194
195 /*
196  * Check that @dir contains no entries except (possibly) dot and dotdot.
197  *
198  * Returns:
199  *
200  *             0        empty
201  *      -ENOTDIR        not a directory object
202  *    -ENOTEMPTY        not empty
203  *           -ve        other error
204  *
205  */
206 static int mdd_dir_is_empty(const struct lu_env *env,
207                             struct mdd_object *dir)
208 {
209         struct dt_it     *it;
210         struct dt_object *obj;
211         struct dt_it_ops *iops;
212         int result;
213         ENTRY;
214
215         obj = mdd_object_child(dir);
216         if (!dt_try_as_dir(env, obj))
217                 RETURN(-ENOTDIR);
218
219         iops = &obj->do_index_ops->dio_it;
220         it = iops->init(env, obj, 0, BYPASS_CAPA);
221         if (it != NULL) {
222                 result = iops->get(env, it, (const void *)"");
223                 if (result > 0) {
224                         int i;
225                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
226                                 result = iops->next(env, it);
227                         if (result == 0)
228                                 result = -ENOTEMPTY;
229                         else if (result == +1)
230                                 result = 0;
231                 } else if (result == 0)
232                         /*
233                          * Huh? Index contains no zero key?
234                          */
235                         result = -EIO;
236
237                 iops->put(env, it);
238                 iops->fini(env, it);
239         } else
240                 result = -ENOMEM;
241         RETURN(result);
242 }
243
244 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
245 {
246         struct mdd_device *m = mdd_obj2mdd_dev(obj);
247         struct lu_attr *la = &mdd_env_info(env)->mti_la;
248         int rc;
249         ENTRY;
250
251         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
252         if (rc)
253                 RETURN(rc);
254
255         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
256                 RETURN(-EMLINK);
257         else
258                 RETURN(0);
259 }
260
261 /*
262  * Check whether it may create the cobj under the pobj.
263  * cobj maybe NULL
264  */
265 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
266                    struct mdd_object *cobj, int check_perm, int check_nlink)
267 {
268         int rc = 0;
269         ENTRY;
270
271         if (cobj && mdd_object_exists(cobj))
272                 RETURN(-EEXIST);
273
274         if (mdd_is_dead_obj(pobj))
275                 RETURN(-ENOENT);
276
277         if (check_perm)
278                 rc = mdd_permission_internal_locked(env, pobj, NULL,
279                                                     MAY_WRITE | MAY_EXEC);
280
281         if (!rc && check_nlink)
282                 rc = __mdd_may_link(env, pobj);
283
284         RETURN(rc);
285 }
286
287 /*
288  * Check whether can unlink from the pobj in the case of "cobj == NULL".
289  */
290 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
291                    const struct md_attr *ma)
292 {
293         int rc;
294         ENTRY;
295
296         if (mdd_is_dead_obj(pobj))
297                 RETURN(-ENOENT);
298
299         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
300             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
301                 RETURN(-EPERM);
302
303         rc = mdd_permission_internal_locked(env, pobj, NULL,
304                                             MAY_WRITE | MAY_EXEC);
305         if (rc)
306                 RETURN(rc);
307
308         if (mdd_is_append(pobj))
309                 RETURN(-EPERM);
310
311         RETURN(rc);
312 }
313
314 /*
315  * pobj == NULL is remote ops case, under such case, pobj's
316  * VTX feature has been checked already, no need check again.
317  */
318 static inline int mdd_is_sticky(const struct lu_env *env,
319                                 struct mdd_object *pobj,
320                                 struct mdd_object *cobj)
321 {
322         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
323         struct md_ucred *uc = md_ucred(env);
324         int rc;
325
326         if (pobj) {
327                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
328                 if (rc)
329                         return rc;
330         
331                 if (!(tmp_la->la_mode & S_ISVTX) ||
332                      (tmp_la->la_uid == uc->mu_fsuid))
333                         return 0;
334         }
335
336         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
337         if (rc) 
338                 return rc;
339         
340         if (tmp_la->la_uid == uc->mu_fsuid)
341                 return 0;
342         
343         return !mdd_capable(uc, CAP_FOWNER);
344 }
345
346 /*
347  * Check whether it may delete the cobj from the pobj.
348  * pobj maybe NULL
349  */
350 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
351                    struct mdd_object *cobj, struct md_attr *ma,
352                    int check_perm, int check_empty)
353 {
354         int rc = 0;
355         ENTRY;
356
357         LASSERT(cobj);
358         if (!mdd_object_exists(cobj))
359                 RETURN(-ENOENT);
360
361         if (pobj) {
362                 if (mdd_is_dead_obj(pobj))
363                         RETURN(-ENOENT);
364
365                 if (check_perm) {
366                         rc = mdd_permission_internal_locked(env, pobj, NULL,
367                                                     MAY_WRITE | MAY_EXEC);
368                         if (rc)
369                                 RETURN(rc);
370                 }
371
372                 if (mdd_is_append(pobj))
373                         RETURN(-EPERM);
374         }
375
376         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
377             mdd_is_sticky(env, pobj, cobj))
378                 RETURN(-EPERM);
379
380         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
381                 RETURN(-EPERM);
382
383         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
384             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
385                 RETURN(-EPERM);
386
387         if (S_ISDIR(ma->ma_attr.la_mode)) {
388                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
389
390                 if (!S_ISDIR(mdd_object_type(cobj)))
391                         RETURN(-ENOTDIR);
392
393                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
394                         RETURN(-EBUSY);
395         } else if (S_ISDIR(mdd_object_type(cobj)))
396                 RETURN(-EISDIR);
397
398         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
399                 rc = mdd_dir_is_empty(env, cobj);
400
401         RETURN(rc);
402 }
403
404 /*
405  * tgt maybe NULL
406  * has mdd_write_lock on src already, but not on tgt yet
407  */
408 int mdd_link_sanity_check(const struct lu_env *env,
409                           struct mdd_object *tgt_obj,
410                           const struct lu_name *lname,
411                           struct mdd_object *src_obj)
412 {
413         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
414         int rc = 0;
415         ENTRY;
416
417         /* Local ops, no lookup before link, check filename length here. */
418         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
419                 RETURN(-ENAMETOOLONG);
420
421         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
422                 RETURN(-EPERM);
423
424         if (S_ISDIR(mdd_object_type(src_obj)))
425                 RETURN(-EPERM);
426
427         LASSERT(src_obj != tgt_obj);
428         if (tgt_obj) {
429                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
430                 if (rc)
431                         RETURN(rc);
432         }
433
434         rc = __mdd_may_link(env, src_obj);
435
436         RETURN(rc);
437 }
438
439 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
440                                    const struct lu_fid *fid)
441 {
442         struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
443
444         fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
445         return (const struct dt_rec *)pack;
446 }
447
448
449 /* insert named index, add reference if isdir */
450 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
451                               const struct lu_fid *lf, const char *name, int is_dir,
452                               struct thandle *handle, struct lustre_capa *capa)
453 {
454         struct dt_object *next = mdd_object_child(pobj);
455         int               rc;
456         ENTRY;
457
458         if (dt_try_as_dir(env, next)) {
459                 rc = next->do_index_ops->dio_insert(env, next,
460                                                     __mdd_fid_rec(env, lf),
461                                                     (const struct dt_key *)name,
462                                                     handle, capa);
463         } else {
464                 rc = -ENOTDIR;
465         }
466
467         if (rc == 0) {
468                 if (is_dir) {
469                         mdd_write_lock(env, pobj);
470                         mdo_ref_add(env, pobj, handle);
471                         mdd_write_unlock(env, pobj);
472                 }
473         }
474         RETURN(rc);
475 }
476
477 /* delete named index, drop reference if isdir */
478 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
479                               const char *name, int is_dir, struct thandle *handle,
480                               struct lustre_capa *capa)
481 {
482         struct dt_object *next = mdd_object_child(pobj);
483         int               rc;
484         ENTRY;
485
486         if (dt_try_as_dir(env, next)) {
487                 rc = next->do_index_ops->dio_delete(env, next,
488                                                     (struct dt_key *)name,
489                                                     handle, capa);
490                 if (rc == 0 && is_dir) {
491                         mdd_write_lock(env, pobj);
492                         mdo_ref_del(env, pobj, handle);
493                         mdd_write_unlock(env, pobj);
494                 }
495         } else
496                 rc = -ENOTDIR;
497
498         RETURN(rc);
499 }
500
501 static int
502 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
503                         const struct lu_fid *lf, const char *name,
504                         struct thandle *handle, struct lustre_capa *capa)
505 {
506         struct dt_object *next = mdd_object_child(pobj);
507         int               rc;
508         ENTRY;
509
510         if (dt_try_as_dir(env, next)) {
511                 rc = next->do_index_ops->dio_insert(env, next,
512                                                     __mdd_fid_rec(env, lf),
513                                                     (const struct dt_key *)name,
514                                                     handle, capa);
515         } else {
516                 rc = -ENOTDIR;
517         }
518         RETURN(rc);
519 }
520
521 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
522                     struct md_object *src_obj, const struct lu_name *lname,
523                     struct md_attr *ma)
524 {
525         char *name = lname->ln_name;
526         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
527         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
528         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
529         struct mdd_device *mdd = mdo2mdd(src_obj);
530         struct dynlock_handle *dlh;
531         struct thandle *handle;
532         int rc;
533         ENTRY;
534
535         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
536         handle = mdd_trans_start(env, mdd);
537         if (IS_ERR(handle))
538                 RETURN(PTR_ERR(handle));
539
540         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
541         if (dlh == NULL)
542                 GOTO(out_trans, rc = -ENOMEM);
543         mdd_write_lock(env, mdd_sobj);
544
545         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
546         if (rc)
547                 GOTO(out_unlock, rc);
548
549         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
550                                      name, handle,
551                                      mdd_object_capa(env, mdd_tobj));
552         if (rc)
553                 GOTO(out_unlock, rc);
554
555         mdo_ref_add(env, mdd_sobj, handle);
556
557         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
558         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
559
560         la->la_valid = LA_CTIME | LA_MTIME;
561         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
562         if (rc)
563                 GOTO(out_unlock, rc);
564
565         la->la_valid = LA_CTIME;
566         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
567         EXIT;
568 out_unlock:
569         mdd_write_unlock(env, mdd_sobj);
570         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
571 out_trans:
572         mdd_trans_stop(env, mdd, rc, handle);
573         return rc;
574 }
575
576 /* caller should take a lock before calling */
577 int mdd_finish_unlink(const struct lu_env *env,
578                       struct mdd_object *obj, struct md_attr *ma,
579                       struct thandle *th)
580 {
581         int rc;
582         ENTRY;
583
584         rc = mdd_iattr_get(env, obj, ma);
585         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
586                 /* add new orphan and the object
587                  * will be deleted during the object_put() */
588                 if (__mdd_orphan_add(env, obj, th) == 0)
589                         obj->mod_flags |= ORPHAN_OBJ;
590
591                 obj->mod_flags |= DEAD_OBJ;
592                 if (obj->mod_count == 0)
593                         rc = mdd_object_kill(env, obj, ma);
594                 else
595                         /* clear MA_LOV | MA_COOKIE, if we do not
596                          * unlink it in case we get it somewhere */
597                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
598         } else
599                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
600
601         RETURN(rc);
602 }
603
604 /*
605  * pobj maybe NULL
606  * has mdd_write_lock on cobj already, but not on pobj yet
607  */
608 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
609                             struct mdd_object *cobj, struct md_attr *ma)
610 {
611         int rc;
612         ENTRY;
613
614         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
615
616         RETURN(rc);
617 }
618
619 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
620                       struct md_object *cobj, const struct lu_name *lname,
621                       struct md_attr *ma)
622 {
623         char *name = lname->ln_name;
624         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
625         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
626         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
627         struct mdd_device *mdd = mdo2mdd(pobj);
628         struct dynlock_handle *dlh;
629         struct thandle    *handle;
630         int rc, is_dir;
631         ENTRY;
632
633         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
634                  PFID(mdd_object_fid(mdd_cobj)));
635
636         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
637         if (rc)
638                 RETURN(rc);
639
640         handle = mdd_trans_start(env, mdd);
641         if (IS_ERR(handle))
642                 RETURN(PTR_ERR(handle));
643
644
645         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
646         if (dlh == NULL)
647                 GOTO(out_trans, rc = -ENOMEM);
648         mdd_write_lock(env, mdd_cobj);
649
650         is_dir = S_ISDIR(ma->ma_attr.la_mode);
651         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
652         if (rc)
653                 GOTO(cleanup, rc);
654
655         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
656                                 mdd_object_capa(env, mdd_pobj));
657         if (rc)
658                 GOTO(cleanup, rc);
659
660         mdo_ref_del(env, mdd_cobj, handle);
661         if (is_dir)
662                 /* unlink dot */
663                 mdo_ref_del(env, mdd_cobj, handle);
664
665         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
666         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
667
668         la->la_valid = LA_CTIME | LA_MTIME;
669         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
670         if (rc)
671                 GOTO(cleanup, rc);
672
673         la->la_valid = LA_CTIME;
674         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
675         if (rc)
676                 GOTO(cleanup, rc);
677
678         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
679
680         if (rc == 0)
681                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
682                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
683                                    NULL, NULL);
684         EXIT;
685 cleanup:
686         mdd_write_unlock(env, mdd_cobj);
687         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
688 out_trans:
689         mdd_trans_stop(env, mdd, rc, handle);
690         return rc;
691 }
692
693 /* has not lock on pobj yet */
694 static int mdd_ni_sanity_check(const struct lu_env *env,
695                                struct md_object *pobj,
696                                const struct md_attr *ma)
697 {
698         struct mdd_object *obj = md2mdd_obj(pobj);
699         int rc;
700         ENTRY;
701
702         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
703                 RETURN(0);
704
705         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
706
707         RETURN(rc);
708 }
709
710 /*
711  * Partial operation.
712  */
713 static int mdd_name_insert(const struct lu_env *env,
714                            struct md_object *pobj,
715                            const struct lu_name *lname,
716                            const struct lu_fid *fid,
717                            const struct md_attr *ma)
718 {
719         char *name = lname->ln_name;
720         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
721         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
722         struct mdd_device *mdd = mdo2mdd(pobj);
723         struct dynlock_handle *dlh;
724         struct thandle *handle;
725         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
726         int rc;
727         ENTRY;
728
729         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
730         handle = mdd_trans_start(env, mdo2mdd(pobj));
731         if (IS_ERR(handle))
732                 RETURN(PTR_ERR(handle));
733
734         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
735         if (dlh == NULL)
736                 GOTO(out_trans, rc = -ENOMEM);
737
738         rc = mdd_ni_sanity_check(env, pobj, ma);
739         if (rc)
740                 GOTO(out_unlock, rc);
741
742         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
743                                 handle, BYPASS_CAPA);
744         if (rc)
745                 GOTO(out_unlock, rc);
746
747         /*
748          * For some case, no need update obj's ctime (LA_CTIME is not set),
749          * e.g. split_dir.
750          * For other cases, update obj's ctime (LA_CTIME is set),
751          * e.g. cmr_link.
752          */
753         if (ma->ma_attr.la_valid & LA_CTIME) {
754                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
755                 la->la_valid = LA_CTIME | LA_MTIME;
756                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
757                                                         handle, 0);
758         }
759         EXIT;
760 out_unlock:
761         mdd_pdo_write_unlock(env, mdd_obj, dlh);
762 out_trans:
763         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
764         return rc;
765 }
766
767 /* has not lock on pobj yet */
768 static int mdd_nr_sanity_check(const struct lu_env *env,
769                                struct md_object *pobj,
770                                const struct md_attr *ma)
771 {
772         struct mdd_object *obj = md2mdd_obj(pobj);
773         int rc;
774         ENTRY;
775
776         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
777                 RETURN(0);
778
779         rc = mdd_may_unlink(env, obj, ma);
780
781         RETURN(rc);
782 }
783
784 /*
785  * Partial operation.
786  */
787 static int mdd_name_remove(const struct lu_env *env,
788                            struct md_object *pobj,
789                            const struct lu_name *lname,
790                            const struct md_attr *ma)
791 {
792         char *name = lname->ln_name;
793         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
794         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
795         struct mdd_device *mdd = mdo2mdd(pobj);
796         struct dynlock_handle *dlh;
797         struct thandle *handle;
798         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
799         int rc;
800         ENTRY;
801
802         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
803         handle = mdd_trans_start(env, mdd);
804         if (IS_ERR(handle))
805                 RETURN(PTR_ERR(handle));
806
807         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
808         if (dlh == NULL)
809                 GOTO(out_trans, rc = -ENOMEM);
810
811         rc = mdd_nr_sanity_check(env, pobj, ma);
812         if (rc)
813                 GOTO(out_unlock, rc);
814
815         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
816                                 handle, BYPASS_CAPA);
817         if (rc)
818                 GOTO(out_unlock, rc);
819
820         /*
821          * For some case, no need update obj's ctime (LA_CTIME is not set),
822          * e.g. split_dir.
823          * For other cases, update obj's ctime (LA_CTIME is set),
824          * e.g. cmr_unlink.
825          */
826         if (ma->ma_attr.la_valid & LA_CTIME) {
827                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
828                 la->la_valid = LA_CTIME | LA_MTIME;
829                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
830                                                         handle, 0);
831         }
832         EXIT;
833 out_unlock:
834         mdd_pdo_write_unlock(env, mdd_obj, dlh);
835 out_trans:
836         mdd_trans_stop(env, mdd, rc, handle);
837         return rc;
838 }
839
840 /*
841  * tobj maybe NULL
842  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
843  */
844 static int mdd_rt_sanity_check(const struct lu_env *env,
845                                struct mdd_object *tgt_pobj,
846                                struct mdd_object *tobj,
847                                struct md_attr *ma)
848 {
849         int rc;
850         ENTRY;
851
852         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
853                 RETURN(0);
854
855         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
856          * exist. In fact, tobj must exist, otherwise the call trace will be:
857          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
858          * When get here, tobj must be NOT NULL, the other case has been
859          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
860          * MDS_PERM_BYPASS.
861          * So check may_delete, but not check nlink of tgt_pobj. */
862         LASSERT(tobj);
863         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
864
865         RETURN(rc);
866 }
867
868 static int mdd_rename_tgt(const struct lu_env *env,
869                           struct md_object *pobj, struct md_object *tobj,
870                           const struct lu_fid *lf, const struct lu_name *lname,
871                           struct md_attr *ma)
872 {
873         char *name = lname->ln_name;
874         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
875         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
876         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
877         struct mdd_device *mdd = mdo2mdd(pobj);
878         struct dynlock_handle *dlh;
879         struct thandle *handle;
880         int rc;
881         ENTRY;
882
883         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
884         handle = mdd_trans_start(env, mdd);
885         if (IS_ERR(handle))
886                 RETURN(PTR_ERR(handle));
887
888         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
889         if (dlh == NULL)
890                 GOTO(out_trans, rc = -ENOMEM);
891         if (tobj)
892                 mdd_write_lock(env, mdd_tobj);
893
894         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
895         if (rc)
896                 GOTO(cleanup, rc);
897
898         /*
899          * If rename_tgt is called then we should just re-insert name with
900          * correct fid, no need to dec/inc parent nlink if obj is dir.
901          */
902         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
903         if (rc)
904                 GOTO(cleanup, rc);
905
906         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
907                                      BYPASS_CAPA);
908         if (rc)
909                 GOTO(cleanup, rc);
910
911         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
912         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
913
914         la->la_valid = LA_CTIME | LA_MTIME;
915         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
916         if (rc)
917                 GOTO(cleanup, rc);
918
919         /* 
920          * For tobj is remote case cmm layer has processed
921          * and pass NULL tobj to here. So when tobj is NOT NULL,
922          * it must be local one.
923          */
924         if (tobj && mdd_object_exists(mdd_tobj)) {
925                 mdo_ref_del(env, mdd_tobj, handle);
926
927                 /* Remove dot reference. */
928                 if (S_ISDIR(ma->ma_attr.la_mode))
929                         mdo_ref_del(env, mdd_tobj, handle);
930
931                 la->la_valid = LA_CTIME;
932                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
933                 if (rc)
934                         GOTO(cleanup, rc);
935
936                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
937                 if (rc)
938                         GOTO(cleanup, rc);
939         }
940         EXIT;
941 cleanup:
942         if (tobj)
943                 mdd_write_unlock(env, mdd_tobj);
944         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
945 out_trans:
946         mdd_trans_stop(env, mdd, rc, handle);
947         return rc;
948 }
949
950 /*
951  * The permission has been checked when obj created, no need check again.
952  */
953 static int mdd_cd_sanity_check(const struct lu_env *env,
954                                struct mdd_object *obj)
955 {
956         ENTRY;
957
958         /* EEXIST check */
959         if (!obj || mdd_is_dead_obj(obj))
960                 RETURN(-ENOENT);
961
962         RETURN(0);
963
964 }
965
966 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
967                            struct md_object *cobj, const struct md_op_spec *spec,
968                            struct md_attr *ma)
969 {
970         struct mdd_device *mdd = mdo2mdd(cobj);
971         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
972         struct mdd_object *son = md2mdd_obj(cobj);
973         struct lu_attr    *attr = &ma->ma_attr;
974         struct lov_mds_md *lmm = NULL;
975         int                lmm_size = 0;
976         struct thandle    *handle;
977         int                rc;
978         ENTRY;
979
980         rc = mdd_cd_sanity_check(env, son);
981         if (rc)
982                 RETURN(rc);
983
984         if (!md_should_create(spec->sp_cr_flags))
985                 RETURN(0);
986
987         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
988                             spec, attr);
989         if (rc)
990                 RETURN(rc);
991
992         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
993         handle = mdd_trans_start(env, mdd);
994         if (IS_ERR(handle))
995                 GOTO(out_free, rc = PTR_ERR(handle));
996
997         /*
998          * XXX: Setting the lov ea is not locked but setting the attr is locked?
999          * Should this be fixed?
1000          */
1001
1002         /* Replay creates has objects already */
1003 #if 0
1004         if (spec->u.sp_ea.no_lov_create) {
1005                 CDEBUG(D_INFO, "we already have lov ea\n");
1006                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1007                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1008                                     spec->u.sp_ea.eadatalen, handle, 0);
1009         } else
1010 #endif
1011                 /* No need mdd_lsm_sanity_check here */
1012                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1013                                     lmm_size, handle, 0);
1014
1015         if (rc == 0)
1016                rc = mdd_attr_get_internal_locked(env, son, ma);
1017
1018         /* update lov_objid data, must be before transaction stop! */
1019         if (rc == 0)
1020                 mdd_lov_objid_update(mdd, lmm);
1021
1022         mdd_trans_stop(env, mdd, rc, handle);
1023 out_free:
1024         /* Finish mdd_lov_create() stuff. */
1025         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1026         RETURN(rc);
1027 }
1028
1029 static int
1030 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1031              const struct lu_name *lname, struct lu_fid* fid, int mask)
1032 {
1033         char                *name = lname->ln_name;
1034         const struct dt_key *key = (const struct dt_key *)name;
1035         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1036         struct mdd_device   *m = mdo2mdd(pobj);
1037         struct dt_object    *dir = mdd_object_child(mdd_obj);
1038         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1039         int rc;
1040         ENTRY;
1041
1042         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1043                 RETURN(-ESTALE);
1044
1045         rc = mdd_object_exists(mdd_obj);
1046         if (unlikely(rc == 0))
1047                 RETURN(-ESTALE);
1048         else if (unlikely(rc < 0)) {
1049                 CERROR("Object "DFID" locates on remote server\n",
1050                         PFID(mdo2fid(mdd_obj)));
1051                 LBUG();
1052         }
1053
1054         /* The common filename length check. */
1055         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1056                 RETURN(-ENAMETOOLONG);
1057
1058         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
1059         if (rc)
1060                 RETURN(rc);
1061
1062         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1063                    dt_try_as_dir(env, dir))) {
1064                 rc = dir->do_index_ops->dio_lookup(env, dir,
1065                                                  (struct dt_rec *)pack, key,
1066                                                  mdd_object_capa(env, mdd_obj));
1067                 if (rc == 0)
1068                         rc = fid_unpack(pack, fid);
1069         } else
1070                 rc = -ENOTDIR;
1071
1072         RETURN(rc);
1073 }
1074
1075 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1076                           struct mdd_object *child, struct md_attr *ma,
1077                           struct thandle *handle)
1078 {
1079         int rc;
1080         ENTRY;
1081
1082         /*
1083          * Update attributes for child.
1084          *
1085          * FIXME:
1086          *  (1) the valid bits should be converted between Lustre and Linux;
1087          *  (2) maybe, the child attributes should be set in OSD when creation.
1088          */
1089
1090         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1091         if (rc != 0)
1092                 RETURN(rc);
1093
1094         if (S_ISDIR(ma->ma_attr.la_mode)) {
1095                 /* Add "." and ".." for newly created dir */
1096                 mdo_ref_add(env, child, handle);
1097                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1098                                              dot, handle, BYPASS_CAPA);
1099                 if (rc == 0) {
1100                         rc = __mdd_index_insert_only(env, child, pfid,
1101                                                      dotdot, handle,
1102                                                      BYPASS_CAPA);
1103                         if (rc != 0) {
1104                                 int rc2;
1105
1106                                 rc2 = __mdd_index_delete(env, child, dot, 0,
1107                                                          handle, BYPASS_CAPA);
1108                                 if (rc2 != 0)
1109                                         CERROR("Failure to cleanup after dotdot"
1110                                                " creation: %d (%d)\n", rc2, rc);
1111                                 else
1112                                         mdo_ref_del(env, child, handle);
1113                         }
1114                 }
1115         }
1116         RETURN(rc);
1117 }
1118
1119 /* has not lock on pobj yet */
1120 static int mdd_create_sanity_check(const struct lu_env *env,
1121                                    struct md_object *pobj,
1122                                    const struct lu_name *lname,
1123                                    struct md_attr *ma,
1124                                    struct md_op_spec *spec)
1125 {
1126         struct mdd_thread_info *info = mdd_env_info(env);
1127         struct lu_attr    *la        = &info->mti_la;
1128         struct lu_fid     *fid       = &info->mti_fid;
1129         struct mdd_object *obj       = md2mdd_obj(pobj);
1130         struct mdd_device *m         = mdo2mdd(pobj);
1131         int lookup                   = spec->sp_cr_lookup;
1132         int rc;
1133         ENTRY;
1134
1135         /* EEXIST check */
1136         if (mdd_is_dead_obj(obj))
1137                 RETURN(-ENOENT);
1138
1139         /*
1140          * In some cases this lookup is not needed - we know before if name
1141          * exists or not because MDT performs lookup for it.
1142          * name length check is done in lookup.
1143          */
1144         if (lookup) {
1145                 /*
1146                  * Check if the name already exist, though it will be checked in
1147                  * _index_insert also, for avoiding rolling back if exists
1148                  * _index_insert.
1149                  */
1150                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1151                                          MAY_WRITE | MAY_EXEC);
1152                 if (rc != -ENOENT)
1153                         RETURN(rc ? : -EEXIST);
1154         } else {
1155                 /*
1156                  * Check WRITE permission for the parent.
1157                  * EXEC permission have been checked
1158                  * when lookup before create already.
1159                  */
1160                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1161                 if (rc)
1162                         RETURN(rc);
1163         }
1164
1165         /* sgid check */
1166         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1167         if (rc != 0)
1168                 RETURN(rc);
1169
1170         if (la->la_mode & S_ISGID) {
1171                 ma->ma_attr.la_gid = la->la_gid;
1172                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1173                         ma->ma_attr.la_mode |= S_ISGID;
1174                         ma->ma_attr.la_valid |= LA_MODE;
1175                 }
1176         }
1177
1178         switch (ma->ma_attr.la_mode & S_IFMT) {
1179         case S_IFDIR: {
1180                 if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
1181                         RETURN(-EMLINK);
1182                 else
1183                         RETURN(0);
1184         }
1185         case S_IFLNK: {
1186                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1187
1188                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1189                         RETURN(-ENAMETOOLONG);
1190                 else
1191                         RETURN(0);
1192         }
1193         case S_IFREG:
1194         case S_IFCHR:
1195         case S_IFBLK:
1196         case S_IFIFO:
1197         case S_IFSOCK:
1198                 rc = 0;
1199                 break;
1200         default:
1201                 rc = -EINVAL;
1202                 break;
1203         }
1204         RETURN(rc);
1205 }
1206
1207 /*
1208  * Create object and insert it into namespace.
1209  */
1210 static int mdd_create(const struct lu_env *env,
1211                       struct md_object *pobj,
1212                       const struct lu_name *lname,
1213                       struct md_object *child,
1214                       struct md_op_spec *spec,
1215                       struct md_attr* ma)
1216 {
1217         char *name = lname->ln_name;
1218         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1219         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1220         struct mdd_object *son = md2mdd_obj(child);
1221         struct mdd_device *mdd = mdo2mdd(pobj);
1222         struct lu_attr    *attr = &ma->ma_attr;
1223         struct lov_mds_md *lmm = NULL;
1224         struct thandle    *handle;
1225         int rc, created = 0, inserted = 0, lmm_size = 0;
1226         struct dynlock_handle *dlh;
1227         ENTRY;
1228
1229         /*
1230          * Two operations have to be performed:
1231          *
1232          *  - allocation of new object (->do_create()), and
1233          *
1234          *  - insertion into parent index (->dio_insert()).
1235          *
1236          * Due to locking, operation order is not important, when both are
1237          * successful, *but* error handling cases are quite different:
1238          *
1239          *  - if insertion is done first, and following object creation fails,
1240          *  insertion has to be rolled back, but this operation might fail
1241          *  also leaving us with dangling index entry.
1242          *
1243          *  - if creation is done first, is has to be undone if insertion
1244          *  fails, leaving us with leaked space, which is neither good, nor
1245          *  fatal.
1246          *
1247          * It seems that creation-first is simplest solution, but it is
1248          * sub-optimal in the frequent
1249          *
1250          *         $ mkdir foo
1251          *         $ mkdir foo
1252          *
1253          * case, because second mkdir is bound to create object, only to
1254          * destroy it immediately.
1255          *
1256          * To avoid this follow local file systems that do double lookup:
1257          *
1258          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1259          *
1260          *     1. create            (mdd_object_create_internal())
1261          *
1262          *     2. insert            (__mdd_index_insert(), lookup again)
1263          */
1264
1265         /* Sanity checks before big job. */
1266         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1267         if (rc)
1268                 RETURN(rc);
1269
1270         /*
1271          * No RPC inside the transaction, so OST objects should be created at
1272          * first.
1273          */
1274         if (S_ISREG(attr->la_mode)) {
1275                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1276                                     spec, attr);
1277                 if (rc)
1278                         RETURN(rc);
1279         }
1280
1281         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1282         handle = mdd_trans_start(env, mdd);
1283         if (IS_ERR(handle))
1284                 GOTO(out_free, rc = PTR_ERR(handle));
1285
1286         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1287         if (dlh == NULL)
1288                 GOTO(out_trans, rc = -ENOMEM);
1289
1290         /*
1291          * XXX: Check that link can be added to the parent in mkdir case.
1292          */
1293
1294         mdd_write_lock(env, son);
1295         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
1296         if (rc) {
1297                 mdd_write_unlock(env, son);
1298                 GOTO(cleanup, rc);
1299         }
1300
1301         created = 1;
1302
1303 #ifdef CONFIG_FS_POSIX_ACL
1304         mdd_read_lock(env, mdd_pobj);
1305         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1306         mdd_read_unlock(env, mdd_pobj);
1307         if (rc) {
1308                 mdd_write_unlock(env, son);
1309                 GOTO(cleanup, rc);
1310         } else {
1311                 ma->ma_attr.la_valid |= LA_MODE;
1312         }
1313 #endif
1314
1315         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1316                                    son, ma, handle);
1317         mdd_write_unlock(env, son);
1318         if (rc)
1319                 /*
1320                  * Object has no links, so it will be destroyed when last
1321                  * reference is released. (XXX not now.)
1322                  */
1323                 GOTO(cleanup, rc);
1324
1325         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1326                                 name, S_ISDIR(attr->la_mode), handle,
1327                                 mdd_object_capa(env, mdd_pobj));
1328
1329         if (rc)
1330                 GOTO(cleanup, rc);
1331
1332         inserted = 1;
1333
1334         /* No need mdd_lsm_sanity_check here */
1335         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1336         if (rc) {
1337                 CERROR("error on stripe info copy %d \n", rc);
1338                 GOTO(cleanup, rc);
1339         }
1340         if (lmm && lmm_size > 0) {
1341                 /* Set Lov here, do not get lmm again later */
1342                 memcpy(ma->ma_lmm, lmm, lmm_size);
1343                 ma->ma_lmm_size = lmm_size;
1344                 ma->ma_valid |= MA_LOV;
1345         }
1346
1347         if (S_ISLNK(attr->la_mode)) {
1348                 struct dt_object *dt = mdd_object_child(son);
1349                 const char *target_name = spec->u.sp_symname;
1350                 int sym_len = strlen(target_name);
1351                 const struct lu_buf *buf;
1352                 loff_t pos = 0;
1353
1354                 buf = mdd_buf_get_const(env, target_name, sym_len);
1355                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1356                                                 mdd_object_capa(env, son));
1357
1358                 if (rc == sym_len)
1359                         rc = 0;
1360                 else
1361                         GOTO(cleanup, rc = -EFAULT);
1362         }
1363
1364         *la = ma->ma_attr;
1365         la->la_valid = LA_CTIME | LA_MTIME;
1366         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1367         if (rc)
1368                 GOTO(cleanup, rc);
1369
1370         /* Return attr back. */
1371         rc = mdd_attr_get_internal_locked(env, son, ma);
1372         EXIT;
1373 cleanup:
1374         if (rc && created) {
1375                 int rc2 = 0;
1376
1377                 if (inserted) {
1378                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1379                                                  S_ISDIR(attr->la_mode),
1380                                                  handle, BYPASS_CAPA);
1381                         if (rc2)
1382                                 CERROR("error can not cleanup destroy %d\n",
1383                                        rc2);
1384                 }
1385                 if (rc2 == 0) {
1386                         mdd_write_lock(env, son);
1387                         mdo_ref_del(env, son, handle);
1388                         mdd_write_unlock(env, son);
1389                 }
1390         }
1391
1392         /* update lov_objid data, must be before transaction stop! */
1393         if (rc == 0)
1394                 mdd_lov_objid_update(mdd, lmm);
1395
1396         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1397 out_trans:
1398         mdd_trans_stop(env, mdd, rc, handle);
1399 out_free:
1400         /* finis lov_create stuff, free all temporary data */
1401         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1402         return rc;
1403 }
1404
1405 /*
1406  * Get locks on parents in proper order
1407  * RETURN: < 0 - error, rename_order if successful
1408  */
1409 enum rename_order {
1410         MDD_RN_SAME,
1411         MDD_RN_SRCTGT,
1412         MDD_RN_TGTSRC
1413 };
1414
1415 static int mdd_rename_order(const struct lu_env *env,
1416                             struct mdd_device *mdd,
1417                             struct mdd_object *src_pobj,
1418                             struct mdd_object *tgt_pobj)
1419 {
1420         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1421         int rc;
1422         ENTRY;
1423
1424         if (src_pobj == tgt_pobj)
1425                 RETURN(MDD_RN_SAME);
1426
1427         /* compared the parent child relationship of src_p&tgt_p */
1428         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1429                 rc = MDD_RN_SRCTGT;
1430         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1431                 rc = MDD_RN_TGTSRC;
1432         } else {
1433                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1434                 if (rc == -EREMOTE)
1435                         rc = 0;
1436
1437                 if (rc == 1)
1438                         rc = MDD_RN_TGTSRC;
1439                 else if (rc == 0)
1440                         rc = MDD_RN_SRCTGT;
1441         }
1442
1443         RETURN(rc);
1444 }
1445
1446 /* has not mdd_write{read}_lock on any obj yet. */
1447 static int mdd_rename_sanity_check(const struct lu_env *env,
1448                                    struct mdd_object *src_pobj,
1449                                    struct mdd_object *tgt_pobj,
1450                                    struct mdd_object *sobj,
1451                                    struct mdd_object *tobj,
1452                                    struct md_attr *ma)
1453 {
1454         int rc = 0;
1455         ENTRY;
1456
1457         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1458                 RETURN(0);
1459
1460         /* XXX: when get here, sobj must NOT be NULL,
1461          * the other case has been processed in cml_rename
1462          * before mdd_rename and enable MDS_PERM_BYPASS. */
1463         LASSERT(sobj);
1464         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1465         if (rc)
1466                 RETURN(rc);
1467
1468         /* XXX: when get here, "tobj == NULL" means tobj must
1469          * NOT exist (neither on remote MDS, such case has been
1470          * processed in cml_rename before mdd_rename and enable
1471          * MDS_PERM_BYPASS).
1472          * So check may_create, but not check may_unlink. */
1473         if (!tobj)
1474                 rc = mdd_may_create(env, tgt_pobj, NULL,
1475                                     (src_pobj != tgt_pobj), 0);
1476         else
1477                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1478                                     (src_pobj != tgt_pobj), 1);
1479
1480         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1481             S_ISDIR(ma->ma_attr.la_mode))
1482                 rc = __mdd_may_link(env, tgt_pobj);
1483
1484         RETURN(rc);
1485 }
1486
1487 /* src object can be remote that is why we use only fid and type of object */
1488 static int mdd_rename(const struct lu_env *env,
1489                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1490                       const struct lu_fid *lf, const struct lu_name *lsname,
1491                       struct md_object *tobj, const struct lu_name *ltname,
1492                       struct md_attr *ma)
1493 {
1494         char *sname = lsname->ln_name;
1495         char *tname = ltname->ln_name;
1496         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1497         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1498         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1499         struct mdd_device *mdd = mdo2mdd(src_pobj);
1500         struct mdd_object *mdd_sobj = NULL;
1501         struct mdd_object *mdd_tobj = NULL;
1502         struct dynlock_handle *sdlh, *tdlh;
1503         struct thandle *handle;
1504         int is_dir;
1505         int rc;
1506         ENTRY;
1507
1508         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1509         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1510
1511         if (tobj)
1512                 mdd_tobj = md2mdd_obj(tobj);
1513
1514         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1515         handle = mdd_trans_start(env, mdd);
1516         if (IS_ERR(handle))
1517                 RETURN(PTR_ERR(handle));
1518
1519         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1520         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1521         if (rc < 0)
1522                 GOTO(cleanup_unlocked, rc);
1523
1524         /* Get locks in determined order */
1525         if (rc == MDD_RN_SAME) {
1526                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1527                 /* check hashes to determine do we need one lock or two */
1528                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1529                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1530                 else
1531                         tdlh = sdlh;
1532         } else if (rc == MDD_RN_SRCTGT) {
1533                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1534                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1535         } else {
1536                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1537                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1538         }
1539         if (sdlh == NULL || tdlh == NULL)
1540                 GOTO(cleanup, rc = -ENOMEM);
1541
1542         mdd_sobj = mdd_object_find(env, mdd, lf);
1543         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1544                                      mdd_sobj, mdd_tobj, ma);
1545         if (rc)
1546                 GOTO(cleanup, rc);
1547
1548         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1549                                 mdd_object_capa(env, mdd_spobj));
1550         if (rc)
1551                 GOTO(cleanup, rc);
1552
1553         /*
1554          * Here tobj can be remote one, so we do index_delete unconditionally
1555          * and -ENOENT is allowed.
1556          */
1557         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1558                                 mdd_object_capa(env, mdd_tpobj));
1559         if (rc != 0 && rc != -ENOENT)
1560                 GOTO(cleanup, rc);
1561
1562         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1563                                 mdd_object_capa(env, mdd_tpobj));
1564         if (rc)
1565                 GOTO(cleanup, rc);
1566
1567         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1568         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1569
1570         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1571         if (mdd_sobj) {
1572                 la->la_valid = LA_CTIME;
1573                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1574                                                         handle, 0);
1575                 if (rc)
1576                         GOTO(cleanup, rc);
1577         }
1578
1579         /* 
1580          * For tobj is remote case cmm layer has processed
1581          * and set tobj to NULL then. So when tobj is NOT NULL,
1582          * it must be local one.
1583          */
1584         if (tobj && mdd_object_exists(mdd_tobj)) {
1585                 mdd_write_lock(env, mdd_tobj);
1586                 mdo_ref_del(env, mdd_tobj, handle);
1587
1588                 /* Remove dot reference. */
1589                 if (is_dir)
1590                         mdo_ref_del(env, mdd_tobj, handle);
1591
1592                 la->la_valid = LA_CTIME;
1593                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1594                 if (rc)
1595                         GOTO(cleanup, rc);
1596
1597                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1598                 mdd_write_unlock(env, mdd_tobj);
1599                 if (rc)
1600                         GOTO(cleanup, rc);
1601         }
1602
1603         la->la_valid = LA_CTIME | LA_MTIME;
1604         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
1605         if (rc)
1606                 GOTO(cleanup, rc);
1607
1608         if (mdd_spobj != mdd_tpobj) {
1609                 la->la_valid = LA_CTIME | LA_MTIME;
1610                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
1611                                                   handle, 0);
1612         }
1613
1614         EXIT;
1615 cleanup:
1616         if (likely(tdlh) && sdlh != tdlh)
1617                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1618         if (likely(sdlh))
1619                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1620 cleanup_unlocked:
1621         mdd_trans_stop(env, mdd, rc, handle);
1622         if (mdd_sobj)
1623                 mdd_object_put(env, mdd_sobj);
1624         return rc;
1625 }
1626
1627 struct md_dir_operations mdd_dir_ops = {
1628         .mdo_is_subdir     = mdd_is_subdir,
1629         .mdo_lookup        = mdd_lookup,
1630         .mdo_create        = mdd_create,
1631         .mdo_rename        = mdd_rename,
1632         .mdo_link          = mdd_link,
1633         .mdo_unlink        = mdd_unlink,
1634         .mdo_name_insert   = mdd_name_insert,
1635         .mdo_name_remove   = mdd_name_remove,
1636         .mdo_rename_tgt    = mdd_rename_tgt,
1637         .mdo_create_data   = mdd_create_data
1638 };