Whamcloud - gitweb
Augment ->do_{read,write}_lock() prototypes with a `role' parameter indicating
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 static int mdd_lookup(const struct lu_env *env,
93                       struct md_object *pobj, const struct lu_name *lname,
94                       struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102
103 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
104                           struct lu_fid *fid)
105 {
106         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
107 }
108
109 /*
110  * For root fid use special function, whcih does not compare version component
111  * of fid. Vresion component is different for root fids on all MDTs.
112  */
113 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
114 {
115         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
116                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
117 }
118
119 /*
120  * return 1: if lf is the fid of the ancestor of p1;
121  * return 0: if not;
122  *
123  * return -EREMOTE: if remote object is found, in this
124  * case fid of remote object is saved to @pf;
125  *
126  * otherwise: values < 0, errors.
127  */
128 static int mdd_is_parent(const struct lu_env *env,
129                          struct mdd_device *mdd,
130                          struct mdd_object *p1,
131                          const struct lu_fid *lf,
132                          struct lu_fid *pf)
133 {
134         struct mdd_object *parent = NULL;
135         struct lu_fid *pfid;
136         int rc;
137         ENTRY;
138
139         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
140         pfid = &mdd_env_info(env)->mti_fid;
141
142         /* Check for root first. */
143         if (mdd_is_root(mdd, mdo2fid(p1)))
144                 RETURN(0);
145
146         for(;;) {
147                 /* this is done recursively, bypass capa for each obj */
148                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
149                 rc = mdd_parent_fid(env, p1, pfid);
150                 if (rc)
151                         GOTO(out, rc);
152                 if (mdd_is_root(mdd, pfid))
153                         GOTO(out, rc = 0);
154                 if (lu_fid_eq(pfid, lf))
155                         GOTO(out, rc = 1);
156                 if (parent)
157                         mdd_object_put(env, parent);
158                 parent = mdd_object_find(env, mdd, pfid);
159
160                 /* cross-ref parent */
161                 if (parent == NULL) {
162                         if (pf != NULL)
163                                 *pf = *pfid;
164                         GOTO(out, rc = -EREMOTE);
165                 } else if (IS_ERR(parent))
166                         GOTO(out, rc = PTR_ERR(parent));
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 static int mdd_is_subdir(const struct lu_env *env,
188                          struct md_object *mo, const struct lu_fid *fid,
189                          struct lu_fid *sfid)
190 {
191         struct mdd_device *mdd = mdo2mdd(mo);
192         int rc;
193         ENTRY;
194
195         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
196                 RETURN(0);
197
198         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
199         if (rc == 0) {
200                 /* found root */
201                 fid_zero(sfid);
202         } else if (rc == 1) {
203                 /* found @fid is parent */
204                 *sfid = *fid;
205                 rc = 0;
206         }
207         RETURN(rc);
208 }
209
210 /*
211  * Check that @dir contains no entries except (possibly) dot and dotdot.
212  *
213  * Returns:
214  *
215  *             0        empty
216  *      -ENOTDIR        not a directory object
217  *    -ENOTEMPTY        not empty
218  *           -ve        other error
219  *
220  */
221 static int mdd_dir_is_empty(const struct lu_env *env,
222                             struct mdd_object *dir)
223 {
224         struct dt_it     *it;
225         struct dt_object *obj;
226         const struct dt_it_ops *iops;
227         int result;
228         ENTRY;
229
230         obj = mdd_object_child(dir);
231         if (!dt_try_as_dir(env, obj))
232                 RETURN(-ENOTDIR);
233
234         iops = &obj->do_index_ops->dio_it;
235         it = iops->init(env, obj, 0, BYPASS_CAPA);
236         if (it != NULL) {
237                 result = iops->get(env, it, (const void *)"");
238                 if (result > 0) {
239                         int i;
240                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
241                                 result = iops->next(env, it);
242                         if (result == 0)
243                                 result = -ENOTEMPTY;
244                         else if (result == +1)
245                                 result = 0;
246                 } else if (result == 0)
247                         /*
248                          * Huh? Index contains no zero key?
249                          */
250                         result = -EIO;
251
252                 iops->put(env, it);
253                 iops->fini(env, it);
254         } else
255                 result = -ENOMEM;
256         RETURN(result);
257 }
258
259 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
260 {
261         struct mdd_device *m = mdd_obj2mdd_dev(obj);
262         struct lu_attr *la = &mdd_env_info(env)->mti_la;
263         int rc;
264         ENTRY;
265
266         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
267         if (rc)
268                 RETURN(rc);
269
270         /*
271          * Subdir count limitation can be broken through.
272          */ 
273         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
274             !S_ISDIR(la->la_mode))
275                 RETURN(-EMLINK);
276         else
277                 RETURN(0);
278 }
279
280 /*
281  * Check whether it may create the cobj under the pobj.
282  * cobj maybe NULL
283  */
284 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
285                    struct mdd_object *cobj, int check_perm, int check_nlink)
286 {
287         int rc = 0;
288         ENTRY;
289
290         if (cobj && mdd_object_exists(cobj))
291                 RETURN(-EEXIST);
292
293         if (mdd_is_dead_obj(pobj))
294                 RETURN(-ENOENT);
295
296         if (check_perm)
297                 rc = mdd_permission_internal_locked(env, pobj, NULL,
298                                                     MAY_WRITE | MAY_EXEC,
299                                                     MOR_TGT_PARENT);
300
301         if (!rc && check_nlink)
302                 rc = __mdd_may_link(env, pobj);
303
304         RETURN(rc);
305 }
306
307 /*
308  * Check whether can unlink from the pobj in the case of "cobj == NULL".
309  */
310 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
311                    const struct md_attr *ma)
312 {
313         int rc;
314         ENTRY;
315
316         if (mdd_is_dead_obj(pobj))
317                 RETURN(-ENOENT);
318
319         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
320             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
321                 RETURN(-EPERM);
322
323         rc = mdd_permission_internal_locked(env, pobj, NULL,
324                                             MAY_WRITE | MAY_EXEC,
325                                             MOR_TGT_PARENT);
326         if (rc)
327                 RETURN(rc);
328
329         if (mdd_is_append(pobj))
330                 RETURN(-EPERM);
331
332         RETURN(rc);
333 }
334
335 /*
336  * pobj == NULL is remote ops case, under such case, pobj's
337  * VTX feature has been checked already, no need check again.
338  */
339 static inline int mdd_is_sticky(const struct lu_env *env,
340                                 struct mdd_object *pobj,
341                                 struct mdd_object *cobj)
342 {
343         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
344         struct md_ucred *uc = md_ucred(env);
345         int rc;
346
347         if (pobj) {
348                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
349                 if (rc)
350                         return rc;
351         
352                 if (!(tmp_la->la_mode & S_ISVTX) ||
353                      (tmp_la->la_uid == uc->mu_fsuid))
354                         return 0;
355         }
356
357         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
358         if (rc) 
359                 return rc;
360         
361         if (tmp_la->la_uid == uc->mu_fsuid)
362                 return 0;
363         
364         return !mdd_capable(uc, CFS_CAP_FOWNER);
365 }
366
367 /*
368  * Check whether it may delete the cobj from the pobj.
369  * pobj maybe NULL
370  */
371 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
372                    struct mdd_object *cobj, struct md_attr *ma,
373                    int check_perm, int check_empty)
374 {
375         int rc = 0;
376         ENTRY;
377
378         LASSERT(cobj);
379         if (!mdd_object_exists(cobj))
380                 RETURN(-ENOENT);
381
382         if (pobj) {
383                 if (mdd_is_dead_obj(pobj))
384                         RETURN(-ENOENT);
385
386                 if (check_perm) {
387                         rc = mdd_permission_internal_locked(env, pobj, NULL,
388                                                     MAY_WRITE | MAY_EXEC,
389                                                     MOR_TGT_PARENT);
390                         if (rc)
391                                 RETURN(rc);
392                 }
393
394                 if (mdd_is_append(pobj))
395                         RETURN(-EPERM);
396         }
397
398         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
399             mdd_is_sticky(env, pobj, cobj))
400                 RETURN(-EPERM);
401
402         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
403                 RETURN(-EPERM);
404
405         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
406             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
407                 RETURN(-EPERM);
408
409         if (S_ISDIR(ma->ma_attr.la_mode)) {
410                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
411
412                 if (!S_ISDIR(mdd_object_type(cobj)))
413                         RETURN(-ENOTDIR);
414
415                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
416                         RETURN(-EBUSY);
417         } else if (S_ISDIR(mdd_object_type(cobj)))
418                 RETURN(-EISDIR);
419
420         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
421                 rc = mdd_dir_is_empty(env, cobj);
422
423         RETURN(rc);
424 }
425
426 /*
427  * tgt maybe NULL
428  * has mdd_write_lock on src already, but not on tgt yet
429  */
430 int mdd_link_sanity_check(const struct lu_env *env,
431                           struct mdd_object *tgt_obj,
432                           const struct lu_name *lname,
433                           struct mdd_object *src_obj)
434 {
435         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
436         int rc = 0;
437         ENTRY;
438
439         /* Local ops, no lookup before link, check filename length here. */
440         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
441                 RETURN(-ENAMETOOLONG);
442
443         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
444                 RETURN(-EPERM);
445
446         if (S_ISDIR(mdd_object_type(src_obj)))
447                 RETURN(-EPERM);
448
449         LASSERT(src_obj != tgt_obj);
450         if (tgt_obj) {
451                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
452                 if (rc)
453                         RETURN(rc);
454         }
455
456         rc = __mdd_may_link(env, src_obj);
457
458         RETURN(rc);
459 }
460
461 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
462                                    const struct lu_fid *fid)
463 {
464         struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
465
466         fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
467         return (const struct dt_rec *)pack;
468 }
469
470 /**
471  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
472  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
473  * (maybe too large to be represented). It is a trick to break through the
474  * "i_nlink" limitation for subdir count.
475  */
476 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
477                    struct thandle *handle)
478 {
479         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
480         struct mdd_device *m = mdd_obj2mdd_dev(obj);
481
482         if (!mdd_is_mnlink(obj)) {
483                 if (S_ISDIR(mdd_object_type(obj))) {
484                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
485                                 return;
486
487                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
488                                 obj->mod_flags |= MNLINK_OBJ;
489                                 tmp_la->la_nlink = 1;
490                                 tmp_la->la_valid = LA_NLINK;
491                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
492                                                       0);
493                                 return;
494                         }
495                 }
496                 mdo_ref_add(env, obj, handle);
497         }
498 }
499
500 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
501                    struct thandle *handle, int is_dot)
502 {
503         if (!mdd_is_mnlink(obj) || is_dot)
504                 mdo_ref_del(env, obj, handle);
505 }
506
507 /* insert named index, add reference if isdir */
508 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
509                               const struct lu_fid *lf, const char *name, int is_dir,
510                               struct thandle *handle, struct lustre_capa *capa)
511 {
512         struct dt_object *next = mdd_object_child(pobj);
513         int               rc;
514         ENTRY;
515
516         if (dt_try_as_dir(env, next)) {
517                 rc = next->do_index_ops->dio_insert(env, next,
518                                                     __mdd_fid_rec(env, lf),
519                                                     (const struct dt_key *)name,
520                                                     handle, capa);
521         } else {
522                 rc = -ENOTDIR;
523         }
524
525         if (rc == 0) {
526                 if (is_dir) {
527                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
528                         __mdd_ref_add(env, pobj, handle);
529                         mdd_write_unlock(env, pobj);
530                 }
531         }
532         RETURN(rc);
533 }
534
535 /* delete named index, drop reference if isdir */
536 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
537                               const char *name, int is_dir, struct thandle *handle,
538                               struct lustre_capa *capa)
539 {
540         struct dt_object *next = mdd_object_child(pobj);
541         int               rc;
542         ENTRY;
543
544         if (dt_try_as_dir(env, next)) {
545                 rc = next->do_index_ops->dio_delete(env, next,
546                                                     (struct dt_key *)name,
547                                                     handle, capa);
548                 if (rc == 0 && is_dir) {
549                         int is_dot = 0;
550
551                         if (name != NULL && name[0] == '.' && name[1] == 0)
552                                 is_dot = 1;
553                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
554                         __mdd_ref_del(env, pobj, handle, is_dot);
555                         mdd_write_unlock(env, pobj);
556                 }
557         } else
558                 rc = -ENOTDIR;
559
560         RETURN(rc);
561 }
562
563 static int
564 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
565                         const struct lu_fid *lf, const char *name,
566                         struct thandle *handle, struct lustre_capa *capa)
567 {
568         struct dt_object *next = mdd_object_child(pobj);
569         int               rc;
570         ENTRY;
571
572         if (dt_try_as_dir(env, next)) {
573                 rc = next->do_index_ops->dio_insert(env, next,
574                                                     __mdd_fid_rec(env, lf),
575                                                     (const struct dt_key *)name,
576                                                     handle, capa);
577         } else {
578                 rc = -ENOTDIR;
579         }
580         RETURN(rc);
581 }
582
583 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
584                     struct md_object *src_obj, const struct lu_name *lname,
585                     struct md_attr *ma)
586 {
587         char *name = lname->ln_name;
588         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
589         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
590         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
591         struct mdd_device *mdd = mdo2mdd(src_obj);
592         struct dynlock_handle *dlh;
593         struct thandle *handle;
594         int rc;
595         ENTRY;
596
597         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
598         handle = mdd_trans_start(env, mdd);
599         if (IS_ERR(handle))
600                 RETURN(PTR_ERR(handle));
601
602         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
603         if (dlh == NULL)
604                 GOTO(out_trans, rc = -ENOMEM);
605         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
606
607         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
608         if (rc)
609                 GOTO(out_unlock, rc);
610
611         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
612                                      name, handle,
613                                      mdd_object_capa(env, mdd_tobj));
614         if (rc)
615                 GOTO(out_unlock, rc);
616
617         __mdd_ref_add(env, mdd_sobj, handle);
618
619         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
620         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
621
622         la->la_valid = LA_CTIME | LA_MTIME;
623         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
624         if (rc)
625                 GOTO(out_unlock, rc);
626
627         la->la_valid = LA_CTIME;
628         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
629         EXIT;
630 out_unlock:
631         mdd_write_unlock(env, mdd_sobj);
632         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
633 out_trans:
634         mdd_trans_stop(env, mdd, rc, handle);
635         return rc;
636 }
637
638 /* caller should take a lock before calling */
639 int mdd_finish_unlink(const struct lu_env *env,
640                       struct mdd_object *obj, struct md_attr *ma,
641                       struct thandle *th)
642 {
643         int rc;
644         ENTRY;
645
646         rc = mdd_iattr_get(env, obj, ma);
647         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
648                 /* add new orphan and the object
649                  * will be deleted during the object_put() */
650                 if (__mdd_orphan_add(env, obj, th) == 0)
651                         obj->mod_flags |= ORPHAN_OBJ;
652
653                 obj->mod_flags |= DEAD_OBJ;
654                 if (obj->mod_count == 0)
655                         rc = mdd_object_kill(env, obj, ma);
656                 else
657                         /* clear MA_LOV | MA_COOKIE, if we do not
658                          * unlink it in case we get it somewhere */
659                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
660         } else
661                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
662
663         RETURN(rc);
664 }
665
666 /*
667  * pobj maybe NULL
668  * has mdd_write_lock on cobj already, but not on pobj yet
669  */
670 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
671                             struct mdd_object *cobj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
677
678         RETURN(rc);
679 }
680
681 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
682                       struct md_object *cobj, const struct lu_name *lname,
683                       struct md_attr *ma)
684 {
685         char *name = lname->ln_name;
686         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
687         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
688         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
689         struct mdd_device *mdd = mdo2mdd(pobj);
690         struct dynlock_handle *dlh;
691         struct thandle    *handle;
692         int rc, is_dir;
693         ENTRY;
694
695         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
696                  PFID(mdd_object_fid(mdd_cobj)));
697
698         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
699         if (rc)
700                 RETURN(rc);
701
702         handle = mdd_trans_start(env, mdd);
703         if (IS_ERR(handle))
704                 RETURN(PTR_ERR(handle));
705
706
707         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
708         if (dlh == NULL)
709                 GOTO(out_trans, rc = -ENOMEM);
710         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
711
712         is_dir = S_ISDIR(ma->ma_attr.la_mode);
713         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
714         if (rc)
715                 GOTO(cleanup, rc);
716
717         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
718                                 mdd_object_capa(env, mdd_pobj));
719         if (rc)
720                 GOTO(cleanup, rc);
721
722         __mdd_ref_del(env, mdd_cobj, handle, 0);
723         if (is_dir)
724                 /* unlink dot */
725                 __mdd_ref_del(env, mdd_cobj, handle, 1);
726
727         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
728         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
729
730         la->la_valid = LA_CTIME | LA_MTIME;
731         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
732         if (rc)
733                 GOTO(cleanup, rc);
734
735         la->la_valid = LA_CTIME;
736         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
737         if (rc)
738                 GOTO(cleanup, rc);
739
740         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
741
742         if (rc == 0)
743                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
744                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
745                                    NULL, NULL);
746         EXIT;
747 cleanup:
748         mdd_write_unlock(env, mdd_cobj);
749         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
750 out_trans:
751         mdd_trans_stop(env, mdd, rc, handle);
752         return rc;
753 }
754
755 /* has not lock on pobj yet */
756 static int mdd_ni_sanity_check(const struct lu_env *env,
757                                struct md_object *pobj,
758                                const struct md_attr *ma)
759 {
760         struct mdd_object *obj = md2mdd_obj(pobj);
761         int rc;
762         ENTRY;
763
764         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
765                 RETURN(0);
766
767         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
768
769         RETURN(rc);
770 }
771
772 /*
773  * Partial operation.
774  */
775 static int mdd_name_insert(const struct lu_env *env,
776                            struct md_object *pobj,
777                            const struct lu_name *lname,
778                            const struct lu_fid *fid,
779                            const struct md_attr *ma)
780 {
781         char *name = lname->ln_name;
782         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
783         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
784         struct mdd_device *mdd = mdo2mdd(pobj);
785         struct dynlock_handle *dlh;
786         struct thandle *handle;
787         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
788         int rc;
789         ENTRY;
790
791         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
792         handle = mdd_trans_start(env, mdo2mdd(pobj));
793         if (IS_ERR(handle))
794                 RETURN(PTR_ERR(handle));
795
796         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
797         if (dlh == NULL)
798                 GOTO(out_trans, rc = -ENOMEM);
799
800         rc = mdd_ni_sanity_check(env, pobj, ma);
801         if (rc)
802                 GOTO(out_unlock, rc);
803
804         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
805                                 handle, BYPASS_CAPA);
806         if (rc)
807                 GOTO(out_unlock, rc);
808
809         /*
810          * For some case, no need update obj's ctime (LA_CTIME is not set),
811          * e.g. split_dir.
812          * For other cases, update obj's ctime (LA_CTIME is set),
813          * e.g. cmr_link.
814          */
815         if (ma->ma_attr.la_valid & LA_CTIME) {
816                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
817                 la->la_valid = LA_CTIME | LA_MTIME;
818                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
819                                                         handle, 0);
820         }
821         EXIT;
822 out_unlock:
823         mdd_pdo_write_unlock(env, mdd_obj, dlh);
824 out_trans:
825         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
826         return rc;
827 }
828
829 /* has not lock on pobj yet */
830 static int mdd_nr_sanity_check(const struct lu_env *env,
831                                struct md_object *pobj,
832                                const struct md_attr *ma)
833 {
834         struct mdd_object *obj = md2mdd_obj(pobj);
835         int rc;
836         ENTRY;
837
838         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
839                 RETURN(0);
840
841         rc = mdd_may_unlink(env, obj, ma);
842
843         RETURN(rc);
844 }
845
846 /*
847  * Partial operation.
848  */
849 static int mdd_name_remove(const struct lu_env *env,
850                            struct md_object *pobj,
851                            const struct lu_name *lname,
852                            const struct md_attr *ma)
853 {
854         char *name = lname->ln_name;
855         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
856         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
857         struct mdd_device *mdd = mdo2mdd(pobj);
858         struct dynlock_handle *dlh;
859         struct thandle *handle;
860         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
861         int rc;
862         ENTRY;
863
864         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
865         handle = mdd_trans_start(env, mdd);
866         if (IS_ERR(handle))
867                 RETURN(PTR_ERR(handle));
868
869         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
870         if (dlh == NULL)
871                 GOTO(out_trans, rc = -ENOMEM);
872
873         rc = mdd_nr_sanity_check(env, pobj, ma);
874         if (rc)
875                 GOTO(out_unlock, rc);
876
877         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
878                                 handle, BYPASS_CAPA);
879         if (rc)
880                 GOTO(out_unlock, rc);
881
882         /*
883          * For some case, no need update obj's ctime (LA_CTIME is not set),
884          * e.g. split_dir.
885          * For other cases, update obj's ctime (LA_CTIME is set),
886          * e.g. cmr_unlink.
887          */
888         if (ma->ma_attr.la_valid & LA_CTIME) {
889                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
890                 la->la_valid = LA_CTIME | LA_MTIME;
891                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
892                                                         handle, 0);
893         }
894         EXIT;
895 out_unlock:
896         mdd_pdo_write_unlock(env, mdd_obj, dlh);
897 out_trans:
898         mdd_trans_stop(env, mdd, rc, handle);
899         return rc;
900 }
901
902 /*
903  * tobj maybe NULL
904  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
905  */
906 static int mdd_rt_sanity_check(const struct lu_env *env,
907                                struct mdd_object *tgt_pobj,
908                                struct mdd_object *tobj,
909                                struct md_attr *ma)
910 {
911         int rc;
912         ENTRY;
913
914         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
915                 RETURN(0);
916
917         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
918          * exist. In fact, tobj must exist, otherwise the call trace will be:
919          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
920          * When get here, tobj must be NOT NULL, the other case has been
921          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
922          * MDS_PERM_BYPASS.
923          * So check may_delete, but not check nlink of tgt_pobj. */
924         LASSERT(tobj);
925         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
926
927         RETURN(rc);
928 }
929
930 static int mdd_rename_tgt(const struct lu_env *env,
931                           struct md_object *pobj, struct md_object *tobj,
932                           const struct lu_fid *lf, const struct lu_name *lname,
933                           struct md_attr *ma)
934 {
935         char *name = lname->ln_name;
936         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
937         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
938         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
939         struct mdd_device *mdd = mdo2mdd(pobj);
940         struct dynlock_handle *dlh;
941         struct thandle *handle;
942         int rc;
943         ENTRY;
944
945         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
946         handle = mdd_trans_start(env, mdd);
947         if (IS_ERR(handle))
948                 RETURN(PTR_ERR(handle));
949
950         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
951         if (dlh == NULL)
952                 GOTO(out_trans, rc = -ENOMEM);
953         if (tobj)
954                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
955
956         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
957         if (rc)
958                 GOTO(cleanup, rc);
959
960         /*
961          * If rename_tgt is called then we should just re-insert name with
962          * correct fid, no need to dec/inc parent nlink if obj is dir.
963          */
964         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
965         if (rc)
966                 GOTO(cleanup, rc);
967
968         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
969                                      BYPASS_CAPA);
970         if (rc)
971                 GOTO(cleanup, rc);
972
973         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
974         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
975
976         la->la_valid = LA_CTIME | LA_MTIME;
977         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
978         if (rc)
979                 GOTO(cleanup, rc);
980
981         /* 
982          * For tobj is remote case cmm layer has processed
983          * and pass NULL tobj to here. So when tobj is NOT NULL,
984          * it must be local one.
985          */
986         if (tobj && mdd_object_exists(mdd_tobj)) {
987                 __mdd_ref_del(env, mdd_tobj, handle, 0);
988
989                 /* Remove dot reference. */
990                 if (S_ISDIR(ma->ma_attr.la_mode))
991                         __mdd_ref_del(env, mdd_tobj, handle, 1);
992
993                 la->la_valid = LA_CTIME;
994                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
995                 if (rc)
996                         GOTO(cleanup, rc);
997
998                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
999                 if (rc)
1000                         GOTO(cleanup, rc);
1001         }
1002         EXIT;
1003 cleanup:
1004         if (tobj)
1005                 mdd_write_unlock(env, mdd_tobj);
1006         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1007 out_trans:
1008         mdd_trans_stop(env, mdd, rc, handle);
1009         return rc;
1010 }
1011
1012 /*
1013  * The permission has been checked when obj created, no need check again.
1014  */
1015 static int mdd_cd_sanity_check(const struct lu_env *env,
1016                                struct mdd_object *obj)
1017 {
1018         ENTRY;
1019
1020         /* EEXIST check */
1021         if (!obj || mdd_is_dead_obj(obj))
1022                 RETURN(-ENOENT);
1023
1024         RETURN(0);
1025
1026 }
1027
1028 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1029                            struct md_object *cobj, const struct md_op_spec *spec,
1030                            struct md_attr *ma)
1031 {
1032         struct mdd_device *mdd = mdo2mdd(cobj);
1033         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1034         struct mdd_object *son = md2mdd_obj(cobj);
1035         struct lu_attr    *attr = &ma->ma_attr;
1036         struct lov_mds_md *lmm = NULL;
1037         int                lmm_size = 0;
1038         struct thandle    *handle;
1039         int                rc;
1040         ENTRY;
1041
1042         rc = mdd_cd_sanity_check(env, son);
1043         if (rc)
1044                 RETURN(rc);
1045
1046         if (!md_should_create(spec->sp_cr_flags))
1047                 RETURN(0);
1048
1049         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1050                             spec, attr);
1051         if (rc)
1052                 RETURN(rc);
1053
1054         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1055         handle = mdd_trans_start(env, mdd);
1056         if (IS_ERR(handle))
1057                 GOTO(out_free, rc = PTR_ERR(handle));
1058
1059         /*
1060          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1061          * Should this be fixed?
1062          */
1063
1064         /* Replay creates has objects already */
1065 #if 0
1066         if (spec->u.sp_ea.no_lov_create) {
1067                 CDEBUG(D_INFO, "we already have lov ea\n");
1068                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1069                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1070                                     spec->u.sp_ea.eadatalen, handle, 0);
1071         } else
1072 #endif
1073                 /* No need mdd_lsm_sanity_check here */
1074                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1075                                     lmm_size, handle, 0);
1076
1077         if (rc == 0)
1078                rc = mdd_attr_get_internal_locked(env, son, ma);
1079
1080         /* update lov_objid data, must be before transaction stop! */
1081         if (rc == 0)
1082                 mdd_lov_objid_update(mdd, lmm);
1083
1084         mdd_trans_stop(env, mdd, rc, handle);
1085 out_free:
1086         /* Finish mdd_lov_create() stuff. */
1087         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1088         RETURN(rc);
1089 }
1090
1091 static int
1092 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1093              const struct lu_name *lname, struct lu_fid* fid, int mask)
1094 {
1095         char                *name = lname->ln_name;
1096         const struct dt_key *key = (const struct dt_key *)name;
1097         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1098         struct mdd_device   *m = mdo2mdd(pobj);
1099         struct dt_object    *dir = mdd_object_child(mdd_obj);
1100         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1101         int rc;
1102         ENTRY;
1103
1104         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1105                 RETURN(-ESTALE);
1106
1107         rc = mdd_object_exists(mdd_obj);
1108         if (unlikely(rc == 0))
1109                 RETURN(-ESTALE);
1110         else if (unlikely(rc < 0)) {
1111                 CERROR("Object "DFID" locates on remote server\n",
1112                         PFID(mdo2fid(mdd_obj)));
1113                 LBUG();
1114         }
1115
1116         /* The common filename length check. */
1117         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1118                 RETURN(-ENAMETOOLONG);
1119
1120         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1121                                             MOR_TGT_PARENT);
1122         if (rc)
1123                 RETURN(rc);
1124
1125         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1126                    dt_try_as_dir(env, dir))) {
1127                 rc = dir->do_index_ops->dio_lookup(env, dir,
1128                                                  (struct dt_rec *)pack, key,
1129                                                  mdd_object_capa(env, mdd_obj));
1130                 if (rc == 0)
1131                         rc = fid_unpack(pack, fid);
1132         } else
1133                 rc = -ENOTDIR;
1134
1135         RETURN(rc);
1136 }
1137
1138 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1139                           struct mdd_object *child, struct md_attr *ma,
1140                           struct thandle *handle)
1141 {
1142         int rc;
1143         ENTRY;
1144
1145         /*
1146          * Update attributes for child.
1147          *
1148          * FIXME:
1149          *  (1) the valid bits should be converted between Lustre and Linux;
1150          *  (2) maybe, the child attributes should be set in OSD when creation.
1151          */
1152
1153         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1154         if (rc != 0)
1155                 RETURN(rc);
1156
1157         if (S_ISDIR(ma->ma_attr.la_mode)) {
1158                 /* Add "." and ".." for newly created dir */
1159                 __mdd_ref_add(env, child, handle);
1160                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1161                                              dot, handle, BYPASS_CAPA);
1162                 if (rc == 0) {
1163                         rc = __mdd_index_insert_only(env, child, pfid,
1164                                                      dotdot, handle,
1165                                                      BYPASS_CAPA);
1166                         if (rc != 0) {
1167                                 int rc2;
1168
1169                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1170                                                          handle, BYPASS_CAPA);
1171                                 if (rc2 != 0)
1172                                         CERROR("Failure to cleanup after dotdot"
1173                                                " creation: %d (%d)\n", rc2, rc);
1174                         }
1175                 }
1176         }
1177         RETURN(rc);
1178 }
1179
1180 /* has not lock on pobj yet */
1181 static int mdd_create_sanity_check(const struct lu_env *env,
1182                                    struct md_object *pobj,
1183                                    const struct lu_name *lname,
1184                                    struct md_attr *ma,
1185                                    struct md_op_spec *spec)
1186 {
1187         struct mdd_thread_info *info = mdd_env_info(env);
1188         struct lu_attr    *la        = &info->mti_la;
1189         struct lu_fid     *fid       = &info->mti_fid;
1190         struct mdd_object *obj       = md2mdd_obj(pobj);
1191         struct mdd_device *m         = mdo2mdd(pobj);
1192         int lookup                   = spec->sp_cr_lookup;
1193         int rc;
1194         ENTRY;
1195
1196         /* EEXIST check */
1197         if (mdd_is_dead_obj(obj))
1198                 RETURN(-ENOENT);
1199
1200         /*
1201          * In some cases this lookup is not needed - we know before if name
1202          * exists or not because MDT performs lookup for it.
1203          * name length check is done in lookup.
1204          */
1205         if (lookup) {
1206                 /*
1207                  * Check if the name already exist, though it will be checked in
1208                  * _index_insert also, for avoiding rolling back if exists
1209                  * _index_insert.
1210                  */
1211                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1212                                          MAY_WRITE | MAY_EXEC);
1213                 if (rc != -ENOENT)
1214                         RETURN(rc ? : -EEXIST);
1215         } else {
1216                 /*
1217                  * Check WRITE permission for the parent.
1218                  * EXEC permission have been checked
1219                  * when lookup before create already.
1220                  */
1221                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1222                                                     MOR_TGT_PARENT);
1223                 if (rc)
1224                         RETURN(rc);
1225         }
1226
1227         /* sgid check */
1228         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1229         if (rc != 0)
1230                 RETURN(rc);
1231
1232         if (la->la_mode & S_ISGID) {
1233                 ma->ma_attr.la_gid = la->la_gid;
1234                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1235                         ma->ma_attr.la_mode |= S_ISGID;
1236                         ma->ma_attr.la_valid |= LA_MODE;
1237                 }
1238         }
1239
1240         switch (ma->ma_attr.la_mode & S_IFMT) {
1241         case S_IFLNK: {
1242                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1243
1244                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1245                         RETURN(-ENAMETOOLONG);
1246                 else
1247                         RETURN(0);
1248         }
1249         case S_IFDIR:
1250         case S_IFREG:
1251         case S_IFCHR:
1252         case S_IFBLK:
1253         case S_IFIFO:
1254         case S_IFSOCK:
1255                 rc = 0;
1256                 break;
1257         default:
1258                 rc = -EINVAL;
1259                 break;
1260         }
1261         RETURN(rc);
1262 }
1263
1264 /*
1265  * Create object and insert it into namespace.
1266  */
1267 static int mdd_create(const struct lu_env *env,
1268                       struct md_object *pobj,
1269                       const struct lu_name *lname,
1270                       struct md_object *child,
1271                       struct md_op_spec *spec,
1272                       struct md_attr* ma)
1273 {
1274         struct mdd_thread_info *info = mdd_env_info(env);
1275         struct lu_attr         *la = &info->mti_la_for_fix;
1276         struct md_attr         *ma_acl = &info->mti_ma;
1277         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1278         struct mdd_object      *son = md2mdd_obj(child);
1279         struct mdd_device      *mdd = mdo2mdd(pobj);
1280         struct lu_attr         *attr = &ma->ma_attr;
1281         struct lov_mds_md      *lmm = NULL;
1282         struct thandle         *handle;
1283         struct dynlock_handle  *dlh;
1284         char                   *name = lname->ln_name;
1285         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1286         int got_def_acl = 0;
1287         ENTRY;
1288
1289         /*
1290          * Two operations have to be performed:
1291          *
1292          *  - an allocation of a new object (->do_create()), and
1293          *
1294          *  - an insertion into a parent index (->dio_insert()).
1295          *
1296          * Due to locking, operation order is not important, when both are
1297          * successful, *but* error handling cases are quite different:
1298          *
1299          *  - if insertion is done first, and following object creation fails,
1300          *  insertion has to be rolled back, but this operation might fail
1301          *  also leaving us with dangling index entry.
1302          *
1303          *  - if creation is done first, is has to be undone if insertion
1304          *  fails, leaving us with leaked space, which is neither good, nor
1305          *  fatal.
1306          *
1307          * It seems that creation-first is simplest solution, but it is
1308          * sub-optimal in the frequent
1309          *
1310          *         $ mkdir foo
1311          *         $ mkdir foo
1312          *
1313          * case, because second mkdir is bound to create object, only to
1314          * destroy it immediately.
1315          *
1316          * To avoid this follow local file systems that do double lookup:
1317          *
1318          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1319          *
1320          *     1. create            (mdd_object_create_internal())
1321          *
1322          *     2. insert            (__mdd_index_insert(), lookup again)
1323          */
1324
1325         /* Sanity checks before big job. */
1326         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1327         if (rc)
1328                 RETURN(rc);
1329
1330         /*
1331          * No RPC inside the transaction, so OST objects should be created at
1332          * first.
1333          */
1334         if (S_ISREG(attr->la_mode)) {
1335                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1336                                     spec, attr);
1337                 if (rc)
1338                         RETURN(rc);
1339         }
1340
1341         if (!S_ISLNK(attr->la_mode)) {
1342                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1343                 ma_acl->ma_acl = info->mti_xattr_buf;
1344                 ma_acl->ma_need = MA_ACL_DEF;
1345                 ma_acl->ma_valid = 0;
1346
1347                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1348                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1349                 mdd_read_unlock(env, mdd_pobj);
1350                 if (rc)
1351                         GOTO(out_free, rc);
1352                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1353                         got_def_acl = 1;
1354         }
1355
1356         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1357         handle = mdd_trans_start(env, mdd);
1358         if (IS_ERR(handle))
1359                 GOTO(out_free, rc = PTR_ERR(handle));
1360
1361         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1362         if (dlh == NULL)
1363                 GOTO(out_trans, rc = -ENOMEM);
1364
1365         mdd_write_lock(env, son, MOR_TGT_CHILD);
1366         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
1367         if (rc) {
1368                 mdd_write_unlock(env, son);
1369                 GOTO(cleanup, rc);
1370         }
1371
1372         created = 1;
1373
1374 #ifdef CONFIG_FS_POSIX_ACL
1375         if (got_def_acl) {
1376                 struct lu_buf *acl_buf = &info->mti_buf;
1377                 acl_buf->lb_buf = ma_acl->ma_acl;
1378                 acl_buf->lb_len = ma_acl->ma_acl_size;
1379
1380                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1381                 if (rc) {
1382                         mdd_write_unlock(env, son);
1383                         GOTO(cleanup, rc);
1384                 } else {
1385                         ma->ma_attr.la_valid |= LA_MODE;
1386                 }
1387         }
1388 #endif
1389
1390         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1391                                    son, ma, handle);
1392         mdd_write_unlock(env, son);
1393         if (rc)
1394                 /*
1395                  * Object has no links, so it will be destroyed when last
1396                  * reference is released. (XXX not now.)
1397                  */
1398                 GOTO(cleanup, rc);
1399
1400         initialized = 1;
1401
1402         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1403                                 name, S_ISDIR(attr->la_mode), handle,
1404                                 mdd_object_capa(env, mdd_pobj));
1405
1406         if (rc)
1407                 GOTO(cleanup, rc);
1408
1409         inserted = 1;
1410
1411         /* No need mdd_lsm_sanity_check here */
1412         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1413         if (rc) {
1414                 CERROR("error on stripe info copy %d \n", rc);
1415                 GOTO(cleanup, rc);
1416         }
1417         if (lmm && lmm_size > 0) {
1418                 /* Set Lov here, do not get lmm again later */
1419                 memcpy(ma->ma_lmm, lmm, lmm_size);
1420                 ma->ma_lmm_size = lmm_size;
1421                 ma->ma_valid |= MA_LOV;
1422         }
1423
1424         if (S_ISLNK(attr->la_mode)) {
1425                 struct dt_object *dt = mdd_object_child(son);
1426                 const char *target_name = spec->u.sp_symname;
1427                 int sym_len = strlen(target_name);
1428                 const struct lu_buf *buf;
1429                 loff_t pos = 0;
1430
1431                 buf = mdd_buf_get_const(env, target_name, sym_len);
1432                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1433                                                 mdd_object_capa(env, son));
1434
1435                 if (rc == sym_len)
1436                         rc = 0;
1437                 else
1438                         GOTO(cleanup, rc = -EFAULT);
1439         }
1440
1441         *la = ma->ma_attr;
1442         la->la_valid = LA_CTIME | LA_MTIME;
1443         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1444         if (rc)
1445                 GOTO(cleanup, rc);
1446
1447         /* Return attr back. */
1448         rc = mdd_attr_get_internal_locked(env, son, ma);
1449         EXIT;
1450 cleanup:
1451         if (rc && created) {
1452                 int rc2 = 0;
1453
1454                 if (inserted) {
1455                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1456                                                  S_ISDIR(attr->la_mode),
1457                                                  handle, BYPASS_CAPA);
1458                         if (rc2)
1459                                 CERROR("error can not cleanup destroy %d\n",
1460                                        rc2);
1461                 }
1462
1463                 if (rc2 == 0) {
1464                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1465                         __mdd_ref_del(env, son, handle, 0);
1466                         if (initialized && S_ISDIR(attr->la_mode))
1467                                 __mdd_ref_del(env, son, handle, 1);
1468                         mdd_write_unlock(env, son);
1469                 }
1470         }
1471
1472         /* update lov_objid data, must be before transaction stop! */
1473         if (rc == 0)
1474                 mdd_lov_objid_update(mdd, lmm);
1475
1476         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1477 out_trans:
1478         mdd_trans_stop(env, mdd, rc, handle);
1479 out_free:
1480         /* finis lov_create stuff, free all temporary data */
1481         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1482         return rc;
1483 }
1484
1485 /*
1486  * Get locks on parents in proper order
1487  * RETURN: < 0 - error, rename_order if successful
1488  */
1489 enum rename_order {
1490         MDD_RN_SAME,
1491         MDD_RN_SRCTGT,
1492         MDD_RN_TGTSRC
1493 };
1494
1495 static int mdd_rename_order(const struct lu_env *env,
1496                             struct mdd_device *mdd,
1497                             struct mdd_object *src_pobj,
1498                             struct mdd_object *tgt_pobj)
1499 {
1500         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1501         int rc;
1502         ENTRY;
1503
1504         if (src_pobj == tgt_pobj)
1505                 RETURN(MDD_RN_SAME);
1506
1507         /* compared the parent child relationship of src_p&tgt_p */
1508         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1509                 rc = MDD_RN_SRCTGT;
1510         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1511                 rc = MDD_RN_TGTSRC;
1512         } else {
1513                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1514                 if (rc == -EREMOTE)
1515                         rc = 0;
1516
1517                 if (rc == 1)
1518                         rc = MDD_RN_TGTSRC;
1519                 else if (rc == 0)
1520                         rc = MDD_RN_SRCTGT;
1521         }
1522
1523         RETURN(rc);
1524 }
1525
1526 /* has not mdd_write{read}_lock on any obj yet. */
1527 static int mdd_rename_sanity_check(const struct lu_env *env,
1528                                    struct mdd_object *src_pobj,
1529                                    struct mdd_object *tgt_pobj,
1530                                    struct mdd_object *sobj,
1531                                    struct mdd_object *tobj,
1532                                    struct md_attr *ma)
1533 {
1534         int rc = 0;
1535         ENTRY;
1536
1537         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1538                 RETURN(0);
1539
1540         /* XXX: when get here, sobj must NOT be NULL,
1541          * the other case has been processed in cml_rename
1542          * before mdd_rename and enable MDS_PERM_BYPASS. */
1543         LASSERT(sobj);
1544         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1545         if (rc)
1546                 RETURN(rc);
1547
1548         /* XXX: when get here, "tobj == NULL" means tobj must
1549          * NOT exist (neither on remote MDS, such case has been
1550          * processed in cml_rename before mdd_rename and enable
1551          * MDS_PERM_BYPASS).
1552          * So check may_create, but not check may_unlink. */
1553         if (!tobj)
1554                 rc = mdd_may_create(env, tgt_pobj, NULL,
1555                                     (src_pobj != tgt_pobj), 0);
1556         else
1557                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1558                                     (src_pobj != tgt_pobj), 1);
1559
1560         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1561             S_ISDIR(ma->ma_attr.la_mode))
1562                 rc = __mdd_may_link(env, tgt_pobj);
1563
1564         RETURN(rc);
1565 }
1566
1567 /* src object can be remote that is why we use only fid and type of object */
1568 static int mdd_rename(const struct lu_env *env,
1569                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1570                       const struct lu_fid *lf, const struct lu_name *lsname,
1571                       struct md_object *tobj, const struct lu_name *ltname,
1572                       struct md_attr *ma)
1573 {
1574         char *sname = lsname->ln_name;
1575         char *tname = ltname->ln_name;
1576         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1577         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1578         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1579         struct mdd_device *mdd = mdo2mdd(src_pobj);
1580         struct mdd_object *mdd_sobj = NULL;
1581         struct mdd_object *mdd_tobj = NULL;
1582         struct dynlock_handle *sdlh, *tdlh;
1583         struct thandle *handle;
1584         int is_dir;
1585         int rc;
1586         ENTRY;
1587
1588         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1589         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1590
1591         if (tobj)
1592                 mdd_tobj = md2mdd_obj(tobj);
1593
1594         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1595         handle = mdd_trans_start(env, mdd);
1596         if (IS_ERR(handle))
1597                 RETURN(PTR_ERR(handle));
1598
1599         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1600         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1601         if (rc < 0)
1602                 GOTO(cleanup_unlocked, rc);
1603
1604         /* Get locks in determined order */
1605         if (rc == MDD_RN_SAME) {
1606                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
1607                                           sname, MOR_SRC_PARENT);
1608                 /* check hashes to determine do we need one lock or two */
1609                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1610                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
1611                                 MOR_TGT_PARENT);
1612                 else
1613                         tdlh = sdlh;
1614         } else if (rc == MDD_RN_SRCTGT) {
1615                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
1616                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
1617         } else {
1618                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
1619                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
1620         }
1621         if (sdlh == NULL || tdlh == NULL)
1622                 GOTO(cleanup, rc = -ENOMEM);
1623
1624         mdd_sobj = mdd_object_find(env, mdd, lf);
1625         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1626                                      mdd_sobj, mdd_tobj, ma);
1627         if (rc)
1628                 GOTO(cleanup, rc);
1629
1630         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1631                                 mdd_object_capa(env, mdd_spobj));
1632         if (rc)
1633                 GOTO(cleanup, rc);
1634
1635         /*
1636          * Here tobj can be remote one, so we do index_delete unconditionally
1637          * and -ENOENT is allowed.
1638          */
1639         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1640                                 mdd_object_capa(env, mdd_tpobj));
1641         if (rc != 0 && rc != -ENOENT)
1642                 GOTO(cleanup, rc);
1643
1644         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1645                                 mdd_object_capa(env, mdd_tpobj));
1646         if (rc)
1647                 GOTO(cleanup, rc);
1648
1649         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1650         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1651
1652         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1653         if (mdd_sobj) {
1654                 la->la_valid = LA_CTIME;
1655                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1656                                                         handle, 0);
1657                 if (rc)
1658                         GOTO(cleanup, rc);
1659         }
1660
1661         /* 
1662          * For tobj is remote case cmm layer has processed
1663          * and set tobj to NULL then. So when tobj is NOT NULL,
1664          * it must be local one.
1665          */
1666         if (tobj && mdd_object_exists(mdd_tobj)) {
1667                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1668                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1669
1670                 /* Remove dot reference. */
1671                 if (is_dir)
1672                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1673
1674                 la->la_valid = LA_CTIME;
1675                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1676                 if (rc)
1677                         GOTO(cleanup, rc);
1678
1679                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1680                 mdd_write_unlock(env, mdd_tobj);
1681                 if (rc)
1682                         GOTO(cleanup, rc);
1683         }
1684
1685         la->la_valid = LA_CTIME | LA_MTIME;
1686         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
1687         if (rc)
1688                 GOTO(cleanup, rc);
1689
1690         if (mdd_spobj != mdd_tpobj) {
1691                 la->la_valid = LA_CTIME | LA_MTIME;
1692                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
1693                                                   handle, 0);
1694         }
1695
1696         EXIT;
1697 cleanup:
1698         if (likely(tdlh) && sdlh != tdlh)
1699                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1700         if (likely(sdlh))
1701                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1702 cleanup_unlocked:
1703         mdd_trans_stop(env, mdd, rc, handle);
1704         if (mdd_sobj)
1705                 mdd_object_put(env, mdd_sobj);
1706         return rc;
1707 }
1708
1709 struct md_dir_operations mdd_dir_ops = {
1710         .mdo_is_subdir     = mdd_is_subdir,
1711         .mdo_lookup        = mdd_lookup,
1712         .mdo_create        = mdd_create,
1713         .mdo_rename        = mdd_rename,
1714         .mdo_link          = mdd_link,
1715         .mdo_unlink        = mdd_unlink,
1716         .mdo_name_insert   = mdd_name_insert,
1717         .mdo_name_remove   = mdd_name_remove,
1718         .mdo_rename_tgt    = mdd_rename_tgt,
1719         .mdo_create_data   = mdd_create_data
1720 };