Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 static int mdd_lookup(const struct lu_env *env,
93                       struct md_object *pobj, const struct lu_name *lname,
94                       struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102
103 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
104                           struct lu_fid *fid)
105 {
106         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
107 }
108
109 /*
110  * For root fid use special function, whcih does not compare version component
111  * of fid. Vresion component is different for root fids on all MDTs.
112  */
113 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
114 {
115         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
116                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
117 }
118
119 /*
120  * return 1: if lf is the fid of the ancestor of p1;
121  * return 0: if not;
122  *
123  * return -EREMOTE: if remote object is found, in this
124  * case fid of remote object is saved to @pf;
125  *
126  * otherwise: values < 0, errors.
127  */
128 static int mdd_is_parent(const struct lu_env *env,
129                          struct mdd_device *mdd,
130                          struct mdd_object *p1,
131                          const struct lu_fid *lf,
132                          struct lu_fid *pf)
133 {
134         struct mdd_object *parent = NULL;
135         struct lu_fid *pfid;
136         int rc;
137         ENTRY;
138
139         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
140         pfid = &mdd_env_info(env)->mti_fid;
141
142         /* Check for root first. */
143         if (mdd_is_root(mdd, mdo2fid(p1)))
144                 RETURN(0);
145
146         for(;;) {
147                 /* this is done recursively, bypass capa for each obj */
148                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
149                 rc = mdd_parent_fid(env, p1, pfid);
150                 if (rc)
151                         GOTO(out, rc);
152                 if (mdd_is_root(mdd, pfid))
153                         GOTO(out, rc = 0);
154                 if (lu_fid_eq(pfid, lf))
155                         GOTO(out, rc = 1);
156                 if (parent)
157                         mdd_object_put(env, parent);
158                 parent = mdd_object_find(env, mdd, pfid);
159
160                 /* cross-ref parent */
161                 if (parent == NULL) {
162                         if (pf != NULL)
163                                 *pf = *pfid;
164                         GOTO(out, rc = -EREMOTE);
165                 } else if (IS_ERR(parent))
166                         GOTO(out, rc = PTR_ERR(parent));
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 static int mdd_is_subdir(const struct lu_env *env,
188                          struct md_object *mo, const struct lu_fid *fid,
189                          struct lu_fid *sfid)
190 {
191         struct mdd_device *mdd = mdo2mdd(mo);
192         int rc;
193         ENTRY;
194
195         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
196                 RETURN(0);
197
198         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
199         if (rc == 0) {
200                 /* found root */
201                 fid_zero(sfid);
202         } else if (rc == 1) {
203                 /* found @fid is parent */
204                 *sfid = *fid;
205                 rc = 0;
206         }
207         RETURN(rc);
208 }
209
210 /*
211  * Check that @dir contains no entries except (possibly) dot and dotdot.
212  *
213  * Returns:
214  *
215  *             0        empty
216  *      -ENOTDIR        not a directory object
217  *    -ENOTEMPTY        not empty
218  *           -ve        other error
219  *
220  */
221 static int mdd_dir_is_empty(const struct lu_env *env,
222                             struct mdd_object *dir)
223 {
224         struct dt_it     *it;
225         struct dt_object *obj;
226         const struct dt_it_ops *iops;
227         int result;
228         ENTRY;
229
230         obj = mdd_object_child(dir);
231         if (!dt_try_as_dir(env, obj))
232                 RETURN(-ENOTDIR);
233
234         iops = &obj->do_index_ops->dio_it;
235         it = iops->init(env, obj, 0, BYPASS_CAPA);
236         if (it != NULL) {
237                 result = iops->get(env, it, (const void *)"");
238                 if (result > 0) {
239                         int i;
240                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
241                                 result = iops->next(env, it);
242                         if (result == 0)
243                                 result = -ENOTEMPTY;
244                         else if (result == +1)
245                                 result = 0;
246                 } else if (result == 0)
247                         /*
248                          * Huh? Index contains no zero key?
249                          */
250                         result = -EIO;
251
252                 iops->put(env, it);
253                 iops->fini(env, it);
254         } else
255                 result = -ENOMEM;
256         RETURN(result);
257 }
258
259 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
260 {
261         struct mdd_device *m = mdd_obj2mdd_dev(obj);
262         struct lu_attr *la = &mdd_env_info(env)->mti_la;
263         int rc;
264         ENTRY;
265
266         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
267         if (rc)
268                 RETURN(rc);
269
270         /*
271          * Subdir count limitation can be broken through.
272          */
273         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
274             !S_ISDIR(la->la_mode))
275                 RETURN(-EMLINK);
276         else
277                 RETURN(0);
278 }
279
280 /*
281  * Check whether it may create the cobj under the pobj.
282  * cobj maybe NULL
283  */
284 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
285                    struct mdd_object *cobj, int check_perm, int check_nlink)
286 {
287         int rc = 0;
288         ENTRY;
289
290         if (cobj && mdd_object_exists(cobj))
291                 RETURN(-EEXIST);
292
293         if (mdd_is_dead_obj(pobj))
294                 RETURN(-ENOENT);
295
296         if (check_perm)
297                 rc = mdd_permission_internal_locked(env, pobj, NULL,
298                                                     MAY_WRITE | MAY_EXEC,
299                                                     MOR_TGT_PARENT);
300
301         if (!rc && check_nlink)
302                 rc = __mdd_may_link(env, pobj);
303
304         RETURN(rc);
305 }
306
307 /*
308  * Check whether can unlink from the pobj in the case of "cobj == NULL".
309  */
310 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
311                    const struct md_attr *ma)
312 {
313         int rc;
314         ENTRY;
315
316         if (mdd_is_dead_obj(pobj))
317                 RETURN(-ENOENT);
318
319         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
320             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
321                 RETURN(-EPERM);
322
323         rc = mdd_permission_internal_locked(env, pobj, NULL,
324                                             MAY_WRITE | MAY_EXEC,
325                                             MOR_TGT_PARENT);
326         if (rc)
327                 RETURN(rc);
328
329         if (mdd_is_append(pobj))
330                 RETURN(-EPERM);
331
332         RETURN(rc);
333 }
334
335 /*
336  * pobj == NULL is remote ops case, under such case, pobj's
337  * VTX feature has been checked already, no need check again.
338  */
339 static inline int mdd_is_sticky(const struct lu_env *env,
340                                 struct mdd_object *pobj,
341                                 struct mdd_object *cobj)
342 {
343         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
344         struct md_ucred *uc = md_ucred(env);
345         int rc;
346
347         if (pobj) {
348                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
349                 if (rc)
350                         return rc;
351
352                 if (!(tmp_la->la_mode & S_ISVTX) ||
353                      (tmp_la->la_uid == uc->mu_fsuid))
354                         return 0;
355         }
356
357         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
358         if (rc)
359                 return rc;
360
361         if (tmp_la->la_uid == uc->mu_fsuid)
362                 return 0;
363
364         return !mdd_capable(uc, CFS_CAP_FOWNER);
365 }
366
367 /*
368  * Check whether it may delete the cobj from the pobj.
369  * pobj maybe NULL
370  */
371 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
372                    struct mdd_object *cobj, struct md_attr *ma,
373                    int check_perm, int check_empty)
374 {
375         int rc = 0;
376         ENTRY;
377
378         LASSERT(cobj);
379         if (!mdd_object_exists(cobj))
380                 RETURN(-ENOENT);
381
382         if (pobj) {
383                 if (mdd_is_dead_obj(pobj))
384                         RETURN(-ENOENT);
385
386                 if (check_perm) {
387                         rc = mdd_permission_internal_locked(env, pobj, NULL,
388                                                     MAY_WRITE | MAY_EXEC,
389                                                     MOR_TGT_PARENT);
390                         if (rc)
391                                 RETURN(rc);
392                 }
393
394                 if (mdd_is_append(pobj))
395                         RETURN(-EPERM);
396         }
397
398         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
399             mdd_is_sticky(env, pobj, cobj))
400                 RETURN(-EPERM);
401
402         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
403                 RETURN(-EPERM);
404
405         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
406             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
407                 RETURN(-EPERM);
408
409         if (S_ISDIR(ma->ma_attr.la_mode)) {
410                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
411
412                 if (!S_ISDIR(mdd_object_type(cobj)))
413                         RETURN(-ENOTDIR);
414
415                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
416                         RETURN(-EBUSY);
417         } else if (S_ISDIR(mdd_object_type(cobj)))
418                 RETURN(-EISDIR);
419
420         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
421                 rc = mdd_dir_is_empty(env, cobj);
422
423         RETURN(rc);
424 }
425
426 /*
427  * tgt maybe NULL
428  * has mdd_write_lock on src already, but not on tgt yet
429  */
430 int mdd_link_sanity_check(const struct lu_env *env,
431                           struct mdd_object *tgt_obj,
432                           const struct lu_name *lname,
433                           struct mdd_object *src_obj)
434 {
435         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
436         int rc = 0;
437         ENTRY;
438
439         /* Local ops, no lookup before link, check filename length here. */
440         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
441                 RETURN(-ENAMETOOLONG);
442
443         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
444                 RETURN(-EPERM);
445
446         if (S_ISDIR(mdd_object_type(src_obj)))
447                 RETURN(-EPERM);
448
449         LASSERT(src_obj != tgt_obj);
450         if (tgt_obj) {
451                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
452                 if (rc)
453                         RETURN(rc);
454         }
455
456         rc = __mdd_may_link(env, src_obj);
457
458         RETURN(rc);
459 }
460
461 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
462                                    const struct lu_fid *fid)
463 {
464         struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
465
466         fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
467         return (const struct dt_rec *)pack;
468 }
469
470 /**
471  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
472  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
473  * (maybe too large to be represented). It is a trick to break through the
474  * "i_nlink" limitation for subdir count.
475  */
476 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
477                    struct thandle *handle)
478 {
479         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
480         struct mdd_device *m = mdd_obj2mdd_dev(obj);
481
482         if (!mdd_is_mnlink(obj)) {
483                 if (S_ISDIR(mdd_object_type(obj))) {
484                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
485                                 return;
486
487                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
488                                 obj->mod_flags |= MNLINK_OBJ;
489                                 tmp_la->la_nlink = 1;
490                                 tmp_la->la_valid = LA_NLINK;
491                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
492                                                       0);
493                                 return;
494                         }
495                 }
496                 mdo_ref_add(env, obj, handle);
497         }
498 }
499
500 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
501                    struct thandle *handle, int is_dot)
502 {
503         if (!mdd_is_mnlink(obj) || is_dot)
504                 mdo_ref_del(env, obj, handle);
505 }
506
507 /* insert named index, add reference if isdir */
508 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
509                               const struct lu_fid *lf, const char *name, int is_dir,
510                               struct thandle *handle, struct lustre_capa *capa)
511 {
512         struct dt_object *next = mdd_object_child(pobj);
513         int               rc;
514         ENTRY;
515
516         if (dt_try_as_dir(env, next)) {
517                 struct md_ucred  *uc = md_ucred(env);
518
519                 rc = next->do_index_ops->dio_insert(env, next,
520                                                     __mdd_fid_rec(env, lf),
521                                                     (const struct dt_key *)name,
522                                                     handle, capa, uc->mu_cap &
523                                                     CFS_CAP_SYS_RESOURCE_MASK);
524         } else {
525                 rc = -ENOTDIR;
526         }
527
528         if (rc == 0) {
529                 if (is_dir) {
530                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
531                         __mdd_ref_add(env, pobj, handle);
532                         mdd_write_unlock(env, pobj);
533                 }
534         }
535         RETURN(rc);
536 }
537
538 /* delete named index, drop reference if isdir */
539 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
540                               const char *name, int is_dir, struct thandle *handle,
541                               struct lustre_capa *capa)
542 {
543         struct dt_object *next = mdd_object_child(pobj);
544         int               rc;
545         ENTRY;
546
547         if (dt_try_as_dir(env, next)) {
548                 rc = next->do_index_ops->dio_delete(env, next,
549                                                     (struct dt_key *)name,
550                                                     handle, capa);
551                 if (rc == 0 && is_dir) {
552                         int is_dot = 0;
553
554                         if (name != NULL && name[0] == '.' && name[1] == 0)
555                                 is_dot = 1;
556                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
557                         __mdd_ref_del(env, pobj, handle, is_dot);
558                         mdd_write_unlock(env, pobj);
559                 }
560         } else
561                 rc = -ENOTDIR;
562
563         RETURN(rc);
564 }
565
566 static int
567 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
568                         const struct lu_fid *lf, const char *name,
569                         struct thandle *handle, struct lustre_capa *capa)
570 {
571         struct dt_object *next = mdd_object_child(pobj);
572         int               rc;
573         ENTRY;
574
575         if (dt_try_as_dir(env, next)) {
576                 struct md_ucred  *uc = md_ucred(env);
577
578                 rc = next->do_index_ops->dio_insert(env, next,
579                                                     __mdd_fid_rec(env, lf),
580                                                     (const struct dt_key *)name,
581                                                     handle, capa, uc->mu_cap &
582                                                     CFS_CAP_SYS_RESOURCE_MASK);
583         } else {
584                 rc = -ENOTDIR;
585         }
586         RETURN(rc);
587 }
588
589 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
590                     struct md_object *src_obj, const struct lu_name *lname,
591                     struct md_attr *ma)
592 {
593         char *name = lname->ln_name;
594         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
595         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
596         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
597         struct mdd_device *mdd = mdo2mdd(src_obj);
598         struct dynlock_handle *dlh;
599         struct thandle *handle;
600 #ifdef HAVE_QUOTA_SUPPORT
601         struct obd_device *obd = mdd->mdd_obd_dev;
602         struct mds_obd *mds = &obd->u.mds;
603         unsigned int qids[MAXQUOTAS] = { 0, 0 };
604         int quota_opc = 0, rec_pending = 0;
605 #endif
606         int rc;
607         ENTRY;
608
609 #ifdef HAVE_QUOTA_SUPPORT
610         if (mds->mds_quota) {
611                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
612
613                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
614                 if (!rc) {
615                         quota_opc = FSFILT_OP_LINK;
616                         mdd_quota_wrapper(la_tmp, qids);
617                         /* get block quota for parent */
618                         lquota_chkquota(mds_quota_interface_ref, obd,
619                                         qids[USRQUOTA], qids[GRPQUOTA], 1,
620                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK);
621                 }
622         }
623 #endif
624
625         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
626         handle = mdd_trans_start(env, mdd);
627         if (IS_ERR(handle))
628                 GOTO(out_pending, rc = PTR_ERR(handle));
629
630         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
631         if (dlh == NULL)
632                 GOTO(out_trans, rc = -ENOMEM);
633         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
634
635         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
636         if (rc)
637                 GOTO(out_unlock, rc);
638
639         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
640                                      name, handle,
641                                      mdd_object_capa(env, mdd_tobj));
642         if (rc)
643                 GOTO(out_unlock, rc);
644
645         __mdd_ref_add(env, mdd_sobj, handle);
646
647         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
648         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
649
650         la->la_valid = LA_CTIME | LA_MTIME;
651         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
652         if (rc)
653                 GOTO(out_unlock, rc);
654
655         la->la_valid = LA_CTIME;
656         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
657         EXIT;
658 out_unlock:
659         mdd_write_unlock(env, mdd_sobj);
660         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
661 out_trans:
662         mdd_trans_stop(env, mdd, rc, handle);
663 out_pending:
664 #ifdef HAVE_QUOTA_SUPPORT
665         if (quota_opc) {
666                 if (rec_pending)
667                         lquota_pending_commit(mds_quota_interface_ref, obd,
668                                               qids[USRQUOTA], qids[GRPQUOTA],
669                                               1, 1);
670                 /* Trigger dqacq for the parent owner. If failed,
671                  * the next call for lquota_chkquota will process it. */
672                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
673                               quota_opc);
674         }
675 #endif
676         return rc;
677 }
678
679 /* caller should take a lock before calling */
680 int mdd_finish_unlink(const struct lu_env *env,
681                       struct mdd_object *obj, struct md_attr *ma,
682                       struct thandle *th)
683 {
684         int rc;
685         ENTRY;
686
687         rc = mdd_iattr_get(env, obj, ma);
688         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
689                 /* add new orphan and the object
690                  * will be deleted during the object_put() */
691                 if (__mdd_orphan_add(env, obj, th) == 0)
692                         obj->mod_flags |= ORPHAN_OBJ;
693
694                 obj->mod_flags |= DEAD_OBJ;
695                 if (obj->mod_count == 0)
696                         rc = mdd_object_kill(env, obj, ma);
697                 else
698                         /* clear MA_LOV | MA_COOKIE, if we do not
699                          * unlink it in case we get it somewhere */
700                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
701         } else
702                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
703
704         RETURN(rc);
705 }
706
707 /*
708  * pobj maybe NULL
709  * has mdd_write_lock on cobj already, but not on pobj yet
710  */
711 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
712                             struct mdd_object *cobj, struct md_attr *ma)
713 {
714         int rc;
715         ENTRY;
716
717         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
718
719         RETURN(rc);
720 }
721
722 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
723                       struct md_object *cobj, const struct lu_name *lname,
724                       struct md_attr *ma)
725 {
726         char *name = lname->ln_name;
727         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
728         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
729         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
730         struct mdd_device *mdd = mdo2mdd(pobj);
731         struct dynlock_handle *dlh;
732         struct thandle    *handle;
733 #ifdef HAVE_QUOTA_SUPPORT
734         struct obd_device *obd = mdd->mdd_obd_dev;
735         struct mds_obd *mds = &obd->u.mds;
736         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
737         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
738         int quota_opc = 0;
739 #endif
740         int rc, is_dir;
741         ENTRY;
742
743         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
744                  PFID(mdd_object_fid(mdd_cobj)));
745
746         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
747         if (rc)
748                 RETURN(rc);
749
750         handle = mdd_trans_start(env, mdd);
751         if (IS_ERR(handle))
752                 RETURN(PTR_ERR(handle));
753
754
755         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
756         if (dlh == NULL)
757                 GOTO(out_trans, rc = -ENOMEM);
758         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
759
760         is_dir = S_ISDIR(ma->ma_attr.la_mode);
761         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
762         if (rc)
763                 GOTO(cleanup, rc);
764
765         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
766                                 mdd_object_capa(env, mdd_pobj));
767         if (rc)
768                 GOTO(cleanup, rc);
769
770         __mdd_ref_del(env, mdd_cobj, handle, 0);
771         if (is_dir)
772                 /* unlink dot */
773                 __mdd_ref_del(env, mdd_cobj, handle, 1);
774
775         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
776         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
777
778         la->la_valid = LA_CTIME | LA_MTIME;
779         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
780         if (rc)
781                 GOTO(cleanup, rc);
782
783         la->la_valid = LA_CTIME;
784         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
785         if (rc)
786                 GOTO(cleanup, rc);
787
788         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
789 #ifdef HAVE_QUOTA_SUPPORT
790         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
791             ma->ma_attr.la_nlink == 0) {
792                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
793
794                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
795                 if (!rc) {
796                         mdd_quota_wrapper(la_tmp, qpids);
797                         if (mdd_cobj->mod_count == 0) {
798                                 quota_opc = FSFILT_OP_UNLINK;
799                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
800                         } else {
801                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
802                         }
803                 }
804         }
805 #endif
806
807         if (rc == 0)
808                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
809                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
810                                    NULL, NULL);
811         EXIT;
812 cleanup:
813         mdd_write_unlock(env, mdd_cobj);
814         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
815 out_trans:
816         mdd_trans_stop(env, mdd, rc, handle);
817 #ifdef HAVE_QUOTA_SUPPORT
818         if (quota_opc)
819                 /* Trigger dqrel on the owner of child and parent. If failed,
820                  * the next call for lquota_chkquota will process it. */
821                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
822                               quota_opc);
823 #endif
824         return rc;
825 }
826
827 /* has not lock on pobj yet */
828 static int mdd_ni_sanity_check(const struct lu_env *env,
829                                struct md_object *pobj,
830                                const struct md_attr *ma)
831 {
832         struct mdd_object *obj = md2mdd_obj(pobj);
833         int rc;
834         ENTRY;
835
836         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
837                 RETURN(0);
838
839         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
840
841         RETURN(rc);
842 }
843
844 /*
845  * Partial operation.
846  */
847 static int mdd_name_insert(const struct lu_env *env,
848                            struct md_object *pobj,
849                            const struct lu_name *lname,
850                            const struct lu_fid *fid,
851                            const struct md_attr *ma)
852 {
853         char *name = lname->ln_name;
854         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
855         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
856         struct mdd_device *mdd = mdo2mdd(pobj);
857         struct dynlock_handle *dlh;
858         struct thandle *handle;
859         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
860 #ifdef HAVE_QUOTA_SUPPORT
861         struct md_ucred *uc = md_ucred(env);
862         struct obd_device *obd = mdd->mdd_obd_dev;
863         struct mds_obd *mds = &obd->u.mds;
864         unsigned int qids[MAXQUOTAS] = { 0, 0 };
865         int quota_opc = 0, rec_pending = 0;
866         cfs_cap_t save = uc->mu_cap;
867 #endif
868         int rc;
869         ENTRY;
870
871 #ifdef HAVE_QUOTA_SUPPORT
872         if (mds->mds_quota) {
873                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
874                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
875
876                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
877                         if (!rc) {
878                                 quota_opc = FSFILT_OP_LINK;
879                                 mdd_quota_wrapper(la_tmp, qids);
880                                 /* get block quota for parent */
881                                 lquota_chkquota(mds_quota_interface_ref, obd,
882                                                 qids[USRQUOTA], qids[GRPQUOTA],
883                                                 1, &rec_pending, NULL,
884                                                 LQUOTA_FLAGS_BLK);
885                         }
886                 } else {
887                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
888                 }
889         }
890 #endif
891         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
892         handle = mdd_trans_start(env, mdo2mdd(pobj));
893         if (IS_ERR(handle))
894                 GOTO(out_pending, rc = PTR_ERR(handle));
895
896         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
897         if (dlh == NULL)
898                 GOTO(out_trans, rc = -ENOMEM);
899
900         rc = mdd_ni_sanity_check(env, pobj, ma);
901         if (rc)
902                 GOTO(out_unlock, rc);
903
904         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
905                                 handle, BYPASS_CAPA);
906         if (rc)
907                 GOTO(out_unlock, rc);
908
909         /*
910          * For some case, no need update obj's ctime (LA_CTIME is not set),
911          * e.g. split_dir.
912          * For other cases, update obj's ctime (LA_CTIME is set),
913          * e.g. cmr_link.
914          */
915         if (ma->ma_attr.la_valid & LA_CTIME) {
916                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
917                 la->la_valid = LA_CTIME | LA_MTIME;
918                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
919                                                         handle, 0);
920         }
921         EXIT;
922 out_unlock:
923         mdd_pdo_write_unlock(env, mdd_obj, dlh);
924 out_trans:
925         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
926 out_pending:
927 #ifdef HAVE_QUOTA_SUPPORT
928         if (mds->mds_quota) {
929                 if (quota_opc) {
930                         if (rec_pending)
931                                 lquota_pending_commit(mds_quota_interface_ref,
932                                                       obd, qids[USRQUOTA],
933                                                       qids[GRPQUOTA], 1, 1);
934                         /* Trigger dqacq for the parent owner. If failed,
935                          * the next call for lquota_chkquota will process it*/
936                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
937                                       rc, quota_opc);
938                 } else {
939                         uc->mu_cap = save;
940                 }
941         }
942 #endif
943         return rc;
944 }
945
946 /* has not lock on pobj yet */
947 static int mdd_nr_sanity_check(const struct lu_env *env,
948                                struct md_object *pobj,
949                                const struct md_attr *ma)
950 {
951         struct mdd_object *obj = md2mdd_obj(pobj);
952         int rc;
953         ENTRY;
954
955         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
956                 RETURN(0);
957
958         rc = mdd_may_unlink(env, obj, ma);
959
960         RETURN(rc);
961 }
962
963 /*
964  * Partial operation.
965  */
966 static int mdd_name_remove(const struct lu_env *env,
967                            struct md_object *pobj,
968                            const struct lu_name *lname,
969                            const struct md_attr *ma)
970 {
971         char *name = lname->ln_name;
972         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
973         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
974         struct mdd_device *mdd = mdo2mdd(pobj);
975         struct dynlock_handle *dlh;
976         struct thandle *handle;
977         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
978 #ifdef HAVE_QUOTA_SUPPORT
979         struct obd_device *obd = mdd->mdd_obd_dev;
980         struct mds_obd *mds = &obd->u.mds;
981         unsigned int qids[MAXQUOTAS] = { 0, 0 };
982         int quota_opc = 0;
983 #endif
984         int rc;
985         ENTRY;
986
987 #ifdef HAVE_QUOTA_SUPPORT
988         if (mds->mds_quota) {
989                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
990
991                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
992                 if (!rc) {
993                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
994                         mdd_quota_wrapper(la_tmp, qids);
995                 }
996         }
997 #endif
998         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
999         handle = mdd_trans_start(env, mdd);
1000         if (IS_ERR(handle))
1001                 GOTO(out_pending, rc = PTR_ERR(handle));
1002
1003         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1004         if (dlh == NULL)
1005                 GOTO(out_trans, rc = -ENOMEM);
1006
1007         rc = mdd_nr_sanity_check(env, pobj, ma);
1008         if (rc)
1009                 GOTO(out_unlock, rc);
1010
1011         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1012                                 handle, BYPASS_CAPA);
1013         if (rc)
1014                 GOTO(out_unlock, rc);
1015
1016         /*
1017          * For some case, no need update obj's ctime (LA_CTIME is not set),
1018          * e.g. split_dir.
1019          * For other cases, update obj's ctime (LA_CTIME is set),
1020          * e.g. cmr_unlink.
1021          */
1022         if (ma->ma_attr.la_valid & LA_CTIME) {
1023                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1024                 la->la_valid = LA_CTIME | LA_MTIME;
1025                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1026                                                         handle, 0);
1027         }
1028         EXIT;
1029 out_unlock:
1030         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1031 out_trans:
1032         mdd_trans_stop(env, mdd, rc, handle);
1033 out_pending:
1034 #ifdef HAVE_QUOTA_SUPPORT
1035         /* Trigger dqrel for the parent owner.
1036          * If failed, the next call for lquota_chkquota will process it. */
1037         if (quota_opc)
1038                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1039                               quota_opc);
1040 #endif
1041         return rc;
1042 }
1043
1044 /*
1045  * tobj maybe NULL
1046  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1047  */
1048 static int mdd_rt_sanity_check(const struct lu_env *env,
1049                                struct mdd_object *tgt_pobj,
1050                                struct mdd_object *tobj,
1051                                struct md_attr *ma)
1052 {
1053         int rc;
1054         ENTRY;
1055
1056         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1057                 RETURN(0);
1058
1059         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1060          * exist. In fact, tobj must exist, otherwise the call trace will be:
1061          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1062          * When get here, tobj must be NOT NULL, the other case has been
1063          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1064          * MDS_PERM_BYPASS.
1065          * So check may_delete, but not check nlink of tgt_pobj. */
1066         LASSERT(tobj);
1067         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1068
1069         RETURN(rc);
1070 }
1071
1072 static int mdd_rename_tgt(const struct lu_env *env,
1073                           struct md_object *pobj, struct md_object *tobj,
1074                           const struct lu_fid *lf, const struct lu_name *lname,
1075                           struct md_attr *ma)
1076 {
1077         char *name = lname->ln_name;
1078         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1079         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1080         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1081         struct mdd_device *mdd = mdo2mdd(pobj);
1082         struct dynlock_handle *dlh;
1083         struct thandle *handle;
1084 #ifdef HAVE_QUOTA_SUPPORT
1085         struct obd_device *obd = mdd->mdd_obd_dev;
1086         struct mds_obd *mds = &obd->u.mds;
1087         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1088         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1089         int quota_opc = 0, rec_pending = 0;
1090 #endif
1091         int rc;
1092         ENTRY;
1093
1094 #ifdef HAVE_QUOTA_SUPPORT
1095         if (mds->mds_quota && !tobj) {
1096                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1097
1098                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1099                 if (!rc) {
1100                         quota_opc = FSFILT_OP_LINK;
1101                         mdd_quota_wrapper(la_tmp, qpids);
1102                         /* get block quota for target parent */
1103                         lquota_chkquota(mds_quota_interface_ref, obd,
1104                                         qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1105                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK);
1106                 }
1107         }
1108 #endif
1109         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
1110         handle = mdd_trans_start(env, mdd);
1111         if (IS_ERR(handle))
1112                 GOTO(out_pending, rc = PTR_ERR(handle));
1113
1114         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1115         if (dlh == NULL)
1116                 GOTO(out_trans, rc = -ENOMEM);
1117         if (tobj)
1118                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1119
1120         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1121         if (rc)
1122                 GOTO(cleanup, rc);
1123
1124         /*
1125          * If rename_tgt is called then we should just re-insert name with
1126          * correct fid, no need to dec/inc parent nlink if obj is dir.
1127          */
1128         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1129         if (rc)
1130                 GOTO(cleanup, rc);
1131
1132         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1133                                      BYPASS_CAPA);
1134         if (rc)
1135                 GOTO(cleanup, rc);
1136
1137         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1138         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1139
1140         la->la_valid = LA_CTIME | LA_MTIME;
1141         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1142         if (rc)
1143                 GOTO(cleanup, rc);
1144
1145         /*
1146          * For tobj is remote case cmm layer has processed
1147          * and pass NULL tobj to here. So when tobj is NOT NULL,
1148          * it must be local one.
1149          */
1150         if (tobj && mdd_object_exists(mdd_tobj)) {
1151                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1152
1153                 /* Remove dot reference. */
1154                 if (S_ISDIR(ma->ma_attr.la_mode))
1155                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1156
1157                 la->la_valid = LA_CTIME;
1158                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1159                 if (rc)
1160                         GOTO(cleanup, rc);
1161
1162                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1163                 if (rc)
1164                         GOTO(cleanup, rc);
1165
1166 #ifdef HAVE_QUOTA_SUPPORT
1167                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1168                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1169                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1170                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1171                 }
1172 #endif
1173         }
1174         EXIT;
1175 cleanup:
1176         if (tobj)
1177                 mdd_write_unlock(env, mdd_tobj);
1178         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1179 out_trans:
1180         mdd_trans_stop(env, mdd, rc, handle);
1181 out_pending:
1182 #ifdef HAVE_QUOTA_SUPPORT
1183         if (mds->mds_quota) {
1184                 if (rec_pending)
1185                         lquota_pending_commit(mds_quota_interface_ref, obd,
1186                                               qpids[USRQUOTA],
1187                                               qpids[GRPQUOTA],
1188                                               1, 1);
1189                 if (quota_opc)
1190                         /* Trigger dqrel/dqacq on the target owner of child and
1191                          * parent. If failed, the next call for lquota_chkquota
1192                          * will process it. */
1193                         lquota_adjust(mds_quota_interface_ref, obd, qcids,
1194                                       qpids, rc, quota_opc);
1195         }
1196 #endif
1197         return rc;
1198 }
1199
1200 /*
1201  * The permission has been checked when obj created, no need check again.
1202  */
1203 static int mdd_cd_sanity_check(const struct lu_env *env,
1204                                struct mdd_object *obj)
1205 {
1206         ENTRY;
1207
1208         /* EEXIST check */
1209         if (!obj || mdd_is_dead_obj(obj))
1210                 RETURN(-ENOENT);
1211
1212         RETURN(0);
1213
1214 }
1215
1216 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1217                            struct md_object *cobj, const struct md_op_spec *spec,
1218                            struct md_attr *ma)
1219 {
1220         struct mdd_device *mdd = mdo2mdd(cobj);
1221         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1222         struct mdd_object *son = md2mdd_obj(cobj);
1223         struct lu_attr    *attr = &ma->ma_attr;
1224         struct lov_mds_md *lmm = NULL;
1225         int                lmm_size = 0;
1226         struct thandle    *handle;
1227         int                rc;
1228         ENTRY;
1229
1230         rc = mdd_cd_sanity_check(env, son);
1231         if (rc)
1232                 RETURN(rc);
1233
1234         if (!md_should_create(spec->sp_cr_flags))
1235                 RETURN(0);
1236
1237         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1238                             spec, attr);
1239         if (rc)
1240                 RETURN(rc);
1241
1242         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1243         handle = mdd_trans_start(env, mdd);
1244         if (IS_ERR(handle))
1245                 GOTO(out_free, rc = PTR_ERR(handle));
1246
1247         /*
1248          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1249          * Should this be fixed?
1250          */
1251
1252         /* Replay creates has objects already */
1253 #if 0
1254         if (spec->u.sp_ea.no_lov_create) {
1255                 CDEBUG(D_INFO, "we already have lov ea\n");
1256                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1257                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1258                                     spec->u.sp_ea.eadatalen, handle, 0);
1259         } else
1260 #endif
1261                 /* No need mdd_lsm_sanity_check here */
1262                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1263                                     lmm_size, handle, 0);
1264
1265         if (rc == 0)
1266                rc = mdd_attr_get_internal_locked(env, son, ma);
1267
1268         /* update lov_objid data, must be before transaction stop! */
1269         if (rc == 0)
1270                 mdd_lov_objid_update(mdd, lmm);
1271
1272         mdd_trans_stop(env, mdd, rc, handle);
1273 out_free:
1274         /* Finish mdd_lov_create() stuff. */
1275         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1276         RETURN(rc);
1277 }
1278
1279 static int
1280 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1281              const struct lu_name *lname, struct lu_fid* fid, int mask)
1282 {
1283         char                *name = lname->ln_name;
1284         const struct dt_key *key = (const struct dt_key *)name;
1285         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1286         struct mdd_device   *m = mdo2mdd(pobj);
1287         struct dt_object    *dir = mdd_object_child(mdd_obj);
1288         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1289         int rc;
1290         ENTRY;
1291
1292         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1293                 RETURN(-ESTALE);
1294
1295         rc = mdd_object_exists(mdd_obj);
1296         if (unlikely(rc == 0))
1297                 RETURN(-ESTALE);
1298         else if (unlikely(rc < 0)) {
1299                 CERROR("Object "DFID" locates on remote server\n",
1300                         PFID(mdo2fid(mdd_obj)));
1301                 LBUG();
1302         }
1303
1304         /* The common filename length check. */
1305         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1306                 RETURN(-ENAMETOOLONG);
1307
1308         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1309                                             MOR_TGT_PARENT);
1310         if (rc)
1311                 RETURN(rc);
1312
1313         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1314                    dt_try_as_dir(env, dir))) {
1315                 rc = dir->do_index_ops->dio_lookup(env, dir,
1316                                                  (struct dt_rec *)pack, key,
1317                                                  mdd_object_capa(env, mdd_obj));
1318                 if (rc == 0)
1319                         rc = fid_unpack(pack, fid);
1320         } else
1321                 rc = -ENOTDIR;
1322
1323         RETURN(rc);
1324 }
1325
1326 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1327                           struct mdd_object *child, struct md_attr *ma,
1328                           struct thandle *handle)
1329 {
1330         int rc;
1331         ENTRY;
1332
1333         /*
1334          * Update attributes for child.
1335          *
1336          * FIXME:
1337          *  (1) the valid bits should be converted between Lustre and Linux;
1338          *  (2) maybe, the child attributes should be set in OSD when creation.
1339          */
1340
1341         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1342         if (rc != 0)
1343                 RETURN(rc);
1344
1345         if (S_ISDIR(ma->ma_attr.la_mode)) {
1346                 /* Add "." and ".." for newly created dir */
1347                 __mdd_ref_add(env, child, handle);
1348                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1349                                              dot, handle, BYPASS_CAPA);
1350                 if (rc == 0) {
1351                         rc = __mdd_index_insert_only(env, child, pfid,
1352                                                      dotdot, handle,
1353                                                      BYPASS_CAPA);
1354                         if (rc != 0) {
1355                                 int rc2;
1356
1357                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1358                                                          handle, BYPASS_CAPA);
1359                                 if (rc2 != 0)
1360                                         CERROR("Failure to cleanup after dotdot"
1361                                                " creation: %d (%d)\n", rc2, rc);
1362                         }
1363                 }
1364         }
1365         RETURN(rc);
1366 }
1367
1368 /* has not lock on pobj yet */
1369 static int mdd_create_sanity_check(const struct lu_env *env,
1370                                    struct md_object *pobj,
1371                                    const struct lu_name *lname,
1372                                    struct md_attr *ma,
1373                                    struct md_op_spec *spec)
1374 {
1375         struct mdd_thread_info *info = mdd_env_info(env);
1376         struct lu_attr    *la        = &info->mti_la;
1377         struct lu_fid     *fid       = &info->mti_fid;
1378         struct mdd_object *obj       = md2mdd_obj(pobj);
1379         struct mdd_device *m         = mdo2mdd(pobj);
1380         int lookup                   = spec->sp_cr_lookup;
1381         int rc;
1382         ENTRY;
1383
1384         /* EEXIST check */
1385         if (mdd_is_dead_obj(obj))
1386                 RETURN(-ENOENT);
1387
1388         /*
1389          * In some cases this lookup is not needed - we know before if name
1390          * exists or not because MDT performs lookup for it.
1391          * name length check is done in lookup.
1392          */
1393         if (lookup) {
1394                 /*
1395                  * Check if the name already exist, though it will be checked in
1396                  * _index_insert also, for avoiding rolling back if exists
1397                  * _index_insert.
1398                  */
1399                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1400                                          MAY_WRITE | MAY_EXEC);
1401                 if (rc != -ENOENT)
1402                         RETURN(rc ? : -EEXIST);
1403         } else {
1404                 /*
1405                  * Check WRITE permission for the parent.
1406                  * EXEC permission have been checked
1407                  * when lookup before create already.
1408                  */
1409                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1410                                                     MOR_TGT_PARENT);
1411                 if (rc)
1412                         RETURN(rc);
1413         }
1414
1415         /* sgid check */
1416         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1417         if (rc != 0)
1418                 RETURN(rc);
1419
1420         if (la->la_mode & S_ISGID) {
1421                 ma->ma_attr.la_gid = la->la_gid;
1422                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1423                         ma->ma_attr.la_mode |= S_ISGID;
1424                         ma->ma_attr.la_valid |= LA_MODE;
1425                 }
1426         }
1427
1428         switch (ma->ma_attr.la_mode & S_IFMT) {
1429         case S_IFLNK: {
1430                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1431
1432                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1433                         RETURN(-ENAMETOOLONG);
1434                 else
1435                         RETURN(0);
1436         }
1437         case S_IFDIR:
1438         case S_IFREG:
1439         case S_IFCHR:
1440         case S_IFBLK:
1441         case S_IFIFO:
1442         case S_IFSOCK:
1443                 rc = 0;
1444                 break;
1445         default:
1446                 rc = -EINVAL;
1447                 break;
1448         }
1449         RETURN(rc);
1450 }
1451
1452 /*
1453  * Create object and insert it into namespace.
1454  */
1455 static int mdd_create(const struct lu_env *env,
1456                       struct md_object *pobj,
1457                       const struct lu_name *lname,
1458                       struct md_object *child,
1459                       struct md_op_spec *spec,
1460                       struct md_attr* ma)
1461 {
1462         struct mdd_thread_info *info = mdd_env_info(env);
1463         struct lu_attr         *la = &info->mti_la_for_fix;
1464         struct md_attr         *ma_acl = &info->mti_ma;
1465         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1466         struct mdd_object      *son = md2mdd_obj(child);
1467         struct mdd_device      *mdd = mdo2mdd(pobj);
1468         struct lu_attr         *attr = &ma->ma_attr;
1469         struct lov_mds_md      *lmm = NULL;
1470         struct thandle         *handle;
1471         struct dynlock_handle  *dlh;
1472         char                   *name = lname->ln_name;
1473         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1474         int got_def_acl = 0;
1475 #ifdef HAVE_QUOTA_SUPPORT
1476         struct obd_device *obd = mdd->mdd_obd_dev;
1477         struct mds_obd *mds = &obd->u.mds;
1478         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1479         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1480         int quota_opc = 0, block_count = 0;
1481         int inode_pending = 0, block_pending = 0, parent_pending = 0;
1482 #endif
1483         ENTRY;
1484
1485         /*
1486          * Two operations have to be performed:
1487          *
1488          *  - an allocation of a new object (->do_create()), and
1489          *
1490          *  - an insertion into a parent index (->dio_insert()).
1491          *
1492          * Due to locking, operation order is not important, when both are
1493          * successful, *but* error handling cases are quite different:
1494          *
1495          *  - if insertion is done first, and following object creation fails,
1496          *  insertion has to be rolled back, but this operation might fail
1497          *  also leaving us with dangling index entry.
1498          *
1499          *  - if creation is done first, is has to be undone if insertion
1500          *  fails, leaving us with leaked space, which is neither good, nor
1501          *  fatal.
1502          *
1503          * It seems that creation-first is simplest solution, but it is
1504          * sub-optimal in the frequent
1505          *
1506          *         $ mkdir foo
1507          *         $ mkdir foo
1508          *
1509          * case, because second mkdir is bound to create object, only to
1510          * destroy it immediately.
1511          *
1512          * To avoid this follow local file systems that do double lookup:
1513          *
1514          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1515          *
1516          *     1. create            (mdd_object_create_internal())
1517          *
1518          *     2. insert            (__mdd_index_insert(), lookup again)
1519          */
1520
1521         /* Sanity checks before big job. */
1522         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1523         if (rc)
1524                 RETURN(rc);
1525
1526 #ifdef HAVE_QUOTA_SUPPORT
1527         if (mds->mds_quota) {
1528                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1529
1530                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1531                 if (!rc) {
1532                         int same = 0;
1533
1534                         quota_opc = FSFILT_OP_CREATE;
1535                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1536                         mdd_quota_wrapper(la_tmp, qpids);
1537                         /* get file quota for child */
1538                         lquota_chkquota(mds_quota_interface_ref, obd,
1539                                         qcids[USRQUOTA], qcids[GRPQUOTA], 1,
1540                                         &inode_pending, NULL, 0);
1541                         switch (ma->ma_attr.la_mode & S_IFMT) {
1542                         case S_IFLNK:
1543                         case S_IFDIR:
1544                                 block_count = 2;
1545                                 break;
1546                         case S_IFREG:
1547                                 block_count = 1;
1548                                 break;
1549                         }
1550                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1551                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1552                                 block_count += 1;
1553                                 same = 1;
1554                         }
1555                         /* get block quota for child and parent */
1556                         if (block_count)
1557                                 lquota_chkquota(mds_quota_interface_ref, obd,
1558                                                 qcids[USRQUOTA], qcids[GRPQUOTA],
1559                                                 block_count,
1560                                                 &block_pending, NULL,
1561                                                 LQUOTA_FLAGS_BLK);
1562                         if (!same)
1563                                 lquota_chkquota(mds_quota_interface_ref, obd,
1564                                                 qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1565                                                 &parent_pending, NULL,
1566                                                 LQUOTA_FLAGS_BLK);
1567                 }
1568         }
1569 #endif
1570
1571         /*
1572          * No RPC inside the transaction, so OST objects should be created at
1573          * first.
1574          */
1575         if (S_ISREG(attr->la_mode)) {
1576                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1577                                     spec, attr);
1578                 if (rc)
1579                         GOTO(out_pending, rc);
1580         }
1581
1582         if (!S_ISLNK(attr->la_mode)) {
1583                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1584                 ma_acl->ma_acl = info->mti_xattr_buf;
1585                 ma_acl->ma_need = MA_ACL_DEF;
1586                 ma_acl->ma_valid = 0;
1587
1588                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1589                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1590                 mdd_read_unlock(env, mdd_pobj);
1591                 if (rc)
1592                         GOTO(out_free, rc);
1593                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1594                         got_def_acl = 1;
1595         }
1596
1597         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1598         handle = mdd_trans_start(env, mdd);
1599         if (IS_ERR(handle))
1600                 GOTO(out_free, rc = PTR_ERR(handle));
1601
1602         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1603         if (dlh == NULL)
1604                 GOTO(out_trans, rc = -ENOMEM);
1605
1606         mdd_write_lock(env, son, MOR_TGT_CHILD);
1607         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
1608         if (rc) {
1609                 mdd_write_unlock(env, son);
1610                 GOTO(cleanup, rc);
1611         }
1612
1613         created = 1;
1614
1615 #ifdef CONFIG_FS_POSIX_ACL
1616         if (got_def_acl) {
1617                 struct lu_buf *acl_buf = &info->mti_buf;
1618                 acl_buf->lb_buf = ma_acl->ma_acl;
1619                 acl_buf->lb_len = ma_acl->ma_acl_size;
1620
1621                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1622                 if (rc) {
1623                         mdd_write_unlock(env, son);
1624                         GOTO(cleanup, rc);
1625                 } else {
1626                         ma->ma_attr.la_valid |= LA_MODE;
1627                 }
1628         }
1629 #endif
1630
1631         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1632                                    son, ma, handle);
1633         mdd_write_unlock(env, son);
1634         if (rc)
1635                 /*
1636                  * Object has no links, so it will be destroyed when last
1637                  * reference is released. (XXX not now.)
1638                  */
1639                 GOTO(cleanup, rc);
1640
1641         initialized = 1;
1642
1643         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1644                                 name, S_ISDIR(attr->la_mode), handle,
1645                                 mdd_object_capa(env, mdd_pobj));
1646
1647         if (rc)
1648                 GOTO(cleanup, rc);
1649
1650         inserted = 1;
1651
1652         /* No need mdd_lsm_sanity_check here */
1653         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1654         if (rc) {
1655                 CERROR("error on stripe info copy %d \n", rc);
1656                 GOTO(cleanup, rc);
1657         }
1658         if (lmm && lmm_size > 0) {
1659                 /* Set Lov here, do not get lmm again later */
1660                 memcpy(ma->ma_lmm, lmm, lmm_size);
1661                 ma->ma_lmm_size = lmm_size;
1662                 ma->ma_valid |= MA_LOV;
1663         }
1664
1665         if (S_ISLNK(attr->la_mode)) {
1666                 struct md_ucred  *uc = md_ucred(env);
1667                 struct dt_object *dt = mdd_object_child(son);
1668                 const char *target_name = spec->u.sp_symname;
1669                 int sym_len = strlen(target_name);
1670                 const struct lu_buf *buf;
1671                 loff_t pos = 0;
1672
1673                 buf = mdd_buf_get_const(env, target_name, sym_len);
1674                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1675                                                 mdd_object_capa(env, son),
1676                                                 uc->mu_cap &
1677                                                 CFS_CAP_SYS_RESOURCE_MASK);
1678
1679                 if (rc == sym_len)
1680                         rc = 0;
1681                 else
1682                         GOTO(cleanup, rc = -EFAULT);
1683         }
1684
1685         *la = ma->ma_attr;
1686         la->la_valid = LA_CTIME | LA_MTIME;
1687         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1688         if (rc)
1689                 GOTO(cleanup, rc);
1690
1691         /* Return attr back. */
1692         rc = mdd_attr_get_internal_locked(env, son, ma);
1693         EXIT;
1694 cleanup:
1695         if (rc && created) {
1696                 int rc2 = 0;
1697
1698                 if (inserted) {
1699                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1700                                                  S_ISDIR(attr->la_mode),
1701                                                  handle, BYPASS_CAPA);
1702                         if (rc2)
1703                                 CERROR("error can not cleanup destroy %d\n",
1704                                        rc2);
1705                 }
1706
1707                 if (rc2 == 0) {
1708                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1709                         __mdd_ref_del(env, son, handle, 0);
1710                         if (initialized && S_ISDIR(attr->la_mode))
1711                                 __mdd_ref_del(env, son, handle, 1);
1712                         mdd_write_unlock(env, son);
1713                 }
1714         }
1715
1716         /* update lov_objid data, must be before transaction stop! */
1717         if (rc == 0)
1718                 mdd_lov_objid_update(mdd, lmm);
1719
1720         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1721 out_trans:
1722         mdd_trans_stop(env, mdd, rc, handle);
1723 out_free:
1724         /* finis lov_create stuff, free all temporary data */
1725         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1726 out_pending:
1727 #ifdef HAVE_QUOTA_SUPPORT
1728         if (quota_opc) {
1729                 if (inode_pending)
1730                         lquota_pending_commit(mds_quota_interface_ref, obd,
1731                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1732                                               1, 0);
1733                 if (block_pending)
1734                         lquota_pending_commit(mds_quota_interface_ref, obd,
1735                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1736                                               block_count, 1);
1737                 if (parent_pending)
1738                         lquota_pending_commit(mds_quota_interface_ref, obd,
1739                                               qpids[USRQUOTA], qpids[GRPQUOTA],
1740                                               1, 1);
1741                 /* Trigger dqacq on the owner of child and parent. If failed,
1742                  * the next call for lquota_chkquota will process it. */
1743                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1744                               quota_opc);
1745         }
1746 #endif
1747         return rc;
1748 }
1749
1750 /*
1751  * Get locks on parents in proper order
1752  * RETURN: < 0 - error, rename_order if successful
1753  */
1754 enum rename_order {
1755         MDD_RN_SAME,
1756         MDD_RN_SRCTGT,
1757         MDD_RN_TGTSRC
1758 };
1759
1760 static int mdd_rename_order(const struct lu_env *env,
1761                             struct mdd_device *mdd,
1762                             struct mdd_object *src_pobj,
1763                             struct mdd_object *tgt_pobj)
1764 {
1765         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1766         int rc;
1767         ENTRY;
1768
1769         if (src_pobj == tgt_pobj)
1770                 RETURN(MDD_RN_SAME);
1771
1772         /* compared the parent child relationship of src_p&tgt_p */
1773         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1774                 rc = MDD_RN_SRCTGT;
1775         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1776                 rc = MDD_RN_TGTSRC;
1777         } else {
1778                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1779                 if (rc == -EREMOTE)
1780                         rc = 0;
1781
1782                 if (rc == 1)
1783                         rc = MDD_RN_TGTSRC;
1784                 else if (rc == 0)
1785                         rc = MDD_RN_SRCTGT;
1786         }
1787
1788         RETURN(rc);
1789 }
1790
1791 /* has not mdd_write{read}_lock on any obj yet. */
1792 static int mdd_rename_sanity_check(const struct lu_env *env,
1793                                    struct mdd_object *src_pobj,
1794                                    struct mdd_object *tgt_pobj,
1795                                    struct mdd_object *sobj,
1796                                    struct mdd_object *tobj,
1797                                    struct md_attr *ma)
1798 {
1799         int rc = 0;
1800         ENTRY;
1801
1802         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1803                 RETURN(0);
1804
1805         /* XXX: when get here, sobj must NOT be NULL,
1806          * the other case has been processed in cml_rename
1807          * before mdd_rename and enable MDS_PERM_BYPASS. */
1808         LASSERT(sobj);
1809         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1810         if (rc)
1811                 RETURN(rc);
1812
1813         /* XXX: when get here, "tobj == NULL" means tobj must
1814          * NOT exist (neither on remote MDS, such case has been
1815          * processed in cml_rename before mdd_rename and enable
1816          * MDS_PERM_BYPASS).
1817          * So check may_create, but not check may_unlink. */
1818         if (!tobj)
1819                 rc = mdd_may_create(env, tgt_pobj, NULL,
1820                                     (src_pobj != tgt_pobj), 0);
1821         else
1822                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1823                                     (src_pobj != tgt_pobj), 1);
1824
1825         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1826             S_ISDIR(ma->ma_attr.la_mode))
1827                 rc = __mdd_may_link(env, tgt_pobj);
1828
1829         RETURN(rc);
1830 }
1831
1832 /* src object can be remote that is why we use only fid and type of object */
1833 static int mdd_rename(const struct lu_env *env,
1834                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1835                       const struct lu_fid *lf, const struct lu_name *lsname,
1836                       struct md_object *tobj, const struct lu_name *ltname,
1837                       struct md_attr *ma)
1838 {
1839         char *sname = lsname->ln_name;
1840         char *tname = ltname->ln_name;
1841         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1842         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1843         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1844         struct mdd_device *mdd = mdo2mdd(src_pobj);
1845         struct mdd_object *mdd_sobj = NULL;
1846         struct mdd_object *mdd_tobj = NULL;
1847         struct dynlock_handle *sdlh, *tdlh;
1848         struct thandle *handle;
1849 #ifdef HAVE_QUOTA_SUPPORT
1850         struct obd_device *obd = mdd->mdd_obd_dev;
1851         struct mds_obd *mds = &obd->u.mds;
1852         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
1853         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
1854         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
1855         int quota_opc = 0, rec_pending = 0;
1856 #endif
1857         int rc, is_dir;
1858         ENTRY;
1859
1860         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1861         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1862
1863         if (tobj)
1864                 mdd_tobj = md2mdd_obj(tobj);
1865
1866 #ifdef HAVE_QUOTA_SUPPORT
1867         if (mds->mds_quota) {
1868                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1869
1870                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
1871                 if (!rc) {
1872                         mdd_quota_wrapper(la_tmp, qspids);
1873                         if (!tobj) {
1874                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
1875                                                 BYPASS_CAPA);
1876                                 if (!rc) {
1877                                         quota_opc = FSFILT_OP_LINK;
1878                                         mdd_quota_wrapper(la_tmp, qtpids);
1879                                         /* get block quota for target parent */
1880                                         lquota_chkquota(mds_quota_interface_ref,
1881                                                         obd, qtpids[USRQUOTA],
1882                                                         qtpids[GRPQUOTA], 1,
1883                                                         &rec_pending, NULL,
1884                                                         LQUOTA_FLAGS_BLK);
1885                                 }
1886                         }
1887                 }
1888         }
1889 #endif
1890         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1891         handle = mdd_trans_start(env, mdd);
1892         if (IS_ERR(handle))
1893                 GOTO(out_pending, rc = PTR_ERR(handle));
1894
1895         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1896         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1897         if (rc < 0)
1898                 GOTO(cleanup_unlocked, rc);
1899
1900         /* Get locks in determined order */
1901         if (rc == MDD_RN_SAME) {
1902                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
1903                                           sname, MOR_SRC_PARENT);
1904                 /* check hashes to determine do we need one lock or two */
1905                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1906                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
1907                                 MOR_TGT_PARENT);
1908                 else
1909                         tdlh = sdlh;
1910         } else if (rc == MDD_RN_SRCTGT) {
1911                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
1912                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
1913         } else {
1914                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
1915                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
1916         }
1917         if (sdlh == NULL || tdlh == NULL)
1918                 GOTO(cleanup, rc = -ENOMEM);
1919
1920         mdd_sobj = mdd_object_find(env, mdd, lf);
1921         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1922                                      mdd_sobj, mdd_tobj, ma);
1923         if (rc)
1924                 GOTO(cleanup, rc);
1925
1926         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1927                                 mdd_object_capa(env, mdd_spobj));
1928         if (rc)
1929                 GOTO(cleanup, rc);
1930
1931         /*
1932          * Here tobj can be remote one, so we do index_delete unconditionally
1933          * and -ENOENT is allowed.
1934          */
1935         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1936                                 mdd_object_capa(env, mdd_tpobj));
1937         if (rc != 0 && rc != -ENOENT)
1938                 GOTO(cleanup, rc);
1939
1940         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1941                                 mdd_object_capa(env, mdd_tpobj));
1942         if (rc)
1943                 GOTO(cleanup, rc);
1944
1945         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1946         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1947
1948         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1949         if (mdd_sobj) {
1950                 la->la_valid = LA_CTIME;
1951                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1952                                                         handle, 0);
1953                 if (rc)
1954                         GOTO(cleanup, rc);
1955         }
1956
1957         /*
1958          * For tobj is remote case cmm layer has processed
1959          * and set tobj to NULL then. So when tobj is NOT NULL,
1960          * it must be local one.
1961          */
1962         if (tobj && mdd_object_exists(mdd_tobj)) {
1963                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1964                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1965
1966                 /* Remove dot reference. */
1967                 if (is_dir)
1968                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1969
1970                 la->la_valid = LA_CTIME;
1971                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1972                 if (rc)
1973                         GOTO(cleanup, rc);
1974
1975                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1976                 mdd_write_unlock(env, mdd_tobj);
1977                 if (rc)
1978                         GOTO(cleanup, rc);
1979
1980 #ifdef HAVE_QUOTA_SUPPORT
1981                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1982                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1983                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1984                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
1985                 }
1986 #endif
1987         }
1988
1989         la->la_valid = LA_CTIME | LA_MTIME;
1990         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
1991         if (rc)
1992                 GOTO(cleanup, rc);
1993
1994         if (mdd_spobj != mdd_tpobj) {
1995                 la->la_valid = LA_CTIME | LA_MTIME;
1996                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
1997                                                   handle, 0);
1998         }
1999
2000         EXIT;
2001 cleanup:
2002         if (likely(tdlh) && sdlh != tdlh)
2003                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2004         if (likely(sdlh))
2005                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2006 cleanup_unlocked:
2007         mdd_trans_stop(env, mdd, rc, handle);
2008         if (mdd_sobj)
2009                 mdd_object_put(env, mdd_sobj);
2010 out_pending:
2011 #ifdef HAVE_QUOTA_SUPPORT
2012         if (mds->mds_quota) {
2013                 if (rec_pending)
2014                         lquota_pending_commit(mds_quota_interface_ref, obd,
2015                                               qtpids[USRQUOTA],
2016                                               qtpids[GRPQUOTA],
2017                                               1, 1);
2018                 /* Trigger dqrel on the source owner of parent.
2019                  * If failed, the next call for lquota_chkquota will
2020                  * process it. */
2021                 lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2022                               FSFILT_OP_UNLINK_PARTIAL_PARENT);
2023                 if (quota_opc)
2024                         /* Trigger dqrel/dqacq on the target owner of child and
2025                          * parent. If failed, the next call for lquota_chkquota
2026                          * will process it. */
2027                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2028                                       qtpids, rc, quota_opc);
2029         }
2030 #endif
2031         return rc;
2032 }
2033
2034 const struct md_dir_operations mdd_dir_ops = {
2035         .mdo_is_subdir     = mdd_is_subdir,
2036         .mdo_lookup        = mdd_lookup,
2037         .mdo_create        = mdd_create,
2038         .mdo_rename        = mdd_rename,
2039         .mdo_link          = mdd_link,
2040         .mdo_unlink        = mdd_unlink,
2041         .mdo_name_insert   = mdd_name_insert,
2042         .mdo_name_remove   = mdd_name_remove,
2043         .mdo_rename_tgt    = mdd_rename_tgt,
2044         .mdo_create_data   = mdd_create_data
2045 };