Whamcloud - gitweb
not send LOV EA under replay, we can't know about they size at this
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         const char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 static int mdd_lookup(const struct lu_env *env,
93                       struct md_object *pobj, const struct lu_name *lname,
94                       struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102
103 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
104                           struct lu_fid *fid)
105 {
106         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
107 }
108
109 /*
110  * For root fid use special function, whcih does not compare version component
111  * of fid. Vresion component is different for root fids on all MDTs.
112  */
113 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
114 {
115         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
116                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
117 }
118
119 /*
120  * return 1: if lf is the fid of the ancestor of p1;
121  * return 0: if not;
122  *
123  * return -EREMOTE: if remote object is found, in this
124  * case fid of remote object is saved to @pf;
125  *
126  * otherwise: values < 0, errors.
127  */
128 static int mdd_is_parent(const struct lu_env *env,
129                          struct mdd_device *mdd,
130                          struct mdd_object *p1,
131                          const struct lu_fid *lf,
132                          struct lu_fid *pf)
133 {
134         struct mdd_object *parent = NULL;
135         struct lu_fid *pfid;
136         int rc;
137         ENTRY;
138
139         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
140         pfid = &mdd_env_info(env)->mti_fid;
141
142         /* Check for root first. */
143         if (mdd_is_root(mdd, mdo2fid(p1)))
144                 RETURN(0);
145
146         for(;;) {
147                 /* this is done recursively, bypass capa for each obj */
148                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
149                 rc = mdd_parent_fid(env, p1, pfid);
150                 if (rc)
151                         GOTO(out, rc);
152                 if (mdd_is_root(mdd, pfid))
153                         GOTO(out, rc = 0);
154                 if (lu_fid_eq(pfid, lf))
155                         GOTO(out, rc = 1);
156                 if (parent)
157                         mdd_object_put(env, parent);
158                 parent = mdd_object_find(env, mdd, pfid);
159
160                 /* cross-ref parent */
161                 if (parent == NULL) {
162                         if (pf != NULL)
163                                 *pf = *pfid;
164                         GOTO(out, rc = -EREMOTE);
165                 } else if (IS_ERR(parent))
166                         GOTO(out, rc = PTR_ERR(parent));
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 static int mdd_is_subdir(const struct lu_env *env,
188                          struct md_object *mo, const struct lu_fid *fid,
189                          struct lu_fid *sfid)
190 {
191         struct mdd_device *mdd = mdo2mdd(mo);
192         int rc;
193         ENTRY;
194
195         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
196                 RETURN(0);
197
198         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
199         if (rc == 0) {
200                 /* found root */
201                 fid_zero(sfid);
202         } else if (rc == 1) {
203                 /* found @fid is parent */
204                 *sfid = *fid;
205                 rc = 0;
206         }
207         RETURN(rc);
208 }
209
210 /*
211  * Check that @dir contains no entries except (possibly) dot and dotdot.
212  *
213  * Returns:
214  *
215  *             0        empty
216  *      -ENOTDIR        not a directory object
217  *    -ENOTEMPTY        not empty
218  *           -ve        other error
219  *
220  */
221 static int mdd_dir_is_empty(const struct lu_env *env,
222                             struct mdd_object *dir)
223 {
224         struct dt_it     *it;
225         struct dt_object *obj;
226         const struct dt_it_ops *iops;
227         int result;
228         ENTRY;
229
230         obj = mdd_object_child(dir);
231         if (!dt_try_as_dir(env, obj))
232                 RETURN(-ENOTDIR);
233
234         iops = &obj->do_index_ops->dio_it;
235         it = iops->init(env, obj, BYPASS_CAPA);
236         if (it != NULL) {
237                 result = iops->get(env, it, (const void *)"");
238                 if (result > 0) {
239                         int i;
240                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
241                                 result = iops->next(env, it);
242                         if (result == 0)
243                                 result = -ENOTEMPTY;
244                         else if (result == +1)
245                                 result = 0;
246                 } else if (result == 0)
247                         /*
248                          * Huh? Index contains no zero key?
249                          */
250                         result = -EIO;
251
252                 iops->put(env, it);
253                 iops->fini(env, it);
254         } else
255                 result = -ENOMEM;
256         RETURN(result);
257 }
258
259 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
260 {
261         struct mdd_device *m = mdd_obj2mdd_dev(obj);
262         struct lu_attr *la = &mdd_env_info(env)->mti_la;
263         int rc;
264         ENTRY;
265
266         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
267         if (rc)
268                 RETURN(rc);
269
270         /*
271          * Subdir count limitation can be broken through.
272          */
273         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
274             !S_ISDIR(la->la_mode))
275                 RETURN(-EMLINK);
276         else
277                 RETURN(0);
278 }
279
280 /*
281  * Check whether it may create the cobj under the pobj.
282  * cobj maybe NULL
283  */
284 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
285                    struct mdd_object *cobj, int check_perm, int check_nlink)
286 {
287         int rc = 0;
288         ENTRY;
289
290         if (cobj && mdd_object_exists(cobj))
291                 RETURN(-EEXIST);
292
293         if (mdd_is_dead_obj(pobj))
294                 RETURN(-ENOENT);
295
296         if (check_perm)
297                 rc = mdd_permission_internal_locked(env, pobj, NULL,
298                                                     MAY_WRITE | MAY_EXEC,
299                                                     MOR_TGT_PARENT);
300
301         if (!rc && check_nlink)
302                 rc = __mdd_may_link(env, pobj);
303
304         RETURN(rc);
305 }
306
307 /*
308  * Check whether can unlink from the pobj in the case of "cobj == NULL".
309  */
310 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
311                    const struct md_attr *ma)
312 {
313         int rc;
314         ENTRY;
315
316         if (mdd_is_dead_obj(pobj))
317                 RETURN(-ENOENT);
318
319         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
320             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
321                 RETURN(-EPERM);
322
323         rc = mdd_permission_internal_locked(env, pobj, NULL,
324                                             MAY_WRITE | MAY_EXEC,
325                                             MOR_TGT_PARENT);
326         if (rc)
327                 RETURN(rc);
328
329         if (mdd_is_append(pobj))
330                 RETURN(-EPERM);
331
332         RETURN(rc);
333 }
334
335 /*
336  * pobj == NULL is remote ops case, under such case, pobj's
337  * VTX feature has been checked already, no need check again.
338  */
339 static inline int mdd_is_sticky(const struct lu_env *env,
340                                 struct mdd_object *pobj,
341                                 struct mdd_object *cobj)
342 {
343         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
344         struct md_ucred *uc = md_ucred(env);
345         int rc;
346
347         if (pobj) {
348                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
349                 if (rc)
350                         return rc;
351
352                 if (!(tmp_la->la_mode & S_ISVTX) ||
353                      (tmp_la->la_uid == uc->mu_fsuid))
354                         return 0;
355         }
356
357         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
358         if (rc)
359                 return rc;
360
361         if (tmp_la->la_uid == uc->mu_fsuid)
362                 return 0;
363
364         return !mdd_capable(uc, CFS_CAP_FOWNER);
365 }
366
367 /*
368  * Check whether it may delete the cobj from the pobj.
369  * pobj maybe NULL
370  */
371 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
372                    struct mdd_object *cobj, struct md_attr *ma,
373                    int check_perm, int check_empty)
374 {
375         int rc = 0;
376         ENTRY;
377
378         LASSERT(cobj);
379         if (!mdd_object_exists(cobj))
380                 RETURN(-ENOENT);
381
382         if (pobj) {
383                 if (mdd_is_dead_obj(pobj))
384                         RETURN(-ENOENT);
385
386                 if (check_perm) {
387                         rc = mdd_permission_internal_locked(env, pobj, NULL,
388                                                     MAY_WRITE | MAY_EXEC,
389                                                     MOR_TGT_PARENT);
390                         if (rc)
391                                 RETURN(rc);
392                 }
393
394                 if (mdd_is_append(pobj))
395                         RETURN(-EPERM);
396         }
397
398         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
399             mdd_is_sticky(env, pobj, cobj))
400                 RETURN(-EPERM);
401
402         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
403                 RETURN(-EPERM);
404
405         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
406             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
407                 RETURN(-EPERM);
408
409         if (S_ISDIR(ma->ma_attr.la_mode)) {
410                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
411
412                 if (!S_ISDIR(mdd_object_type(cobj)))
413                         RETURN(-ENOTDIR);
414
415                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
416                         RETURN(-EBUSY);
417         } else if (S_ISDIR(mdd_object_type(cobj)))
418                 RETURN(-EISDIR);
419
420         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
421                 rc = mdd_dir_is_empty(env, cobj);
422
423         RETURN(rc);
424 }
425
426 /*
427  * tgt maybe NULL
428  * has mdd_write_lock on src already, but not on tgt yet
429  */
430 int mdd_link_sanity_check(const struct lu_env *env,
431                           struct mdd_object *tgt_obj,
432                           const struct lu_name *lname,
433                           struct mdd_object *src_obj)
434 {
435         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
436         int rc = 0;
437         ENTRY;
438
439         /* Local ops, no lookup before link, check filename length here. */
440         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
441                 RETURN(-ENAMETOOLONG);
442
443         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
444                 RETURN(-EPERM);
445
446         if (S_ISDIR(mdd_object_type(src_obj)))
447                 RETURN(-EPERM);
448
449         LASSERT(src_obj != tgt_obj);
450         if (tgt_obj) {
451                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
452                 if (rc)
453                         RETURN(rc);
454         }
455
456         rc = __mdd_may_link(env, src_obj);
457
458         RETURN(rc);
459 }
460
461 /**
462  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
463  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
464  * (maybe too large to be represented). It is a trick to break through the
465  * "i_nlink" limitation for subdir count.
466  */
467 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
468                    struct thandle *handle)
469 {
470         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
471         struct mdd_device *m = mdd_obj2mdd_dev(obj);
472
473         if (!mdd_is_mnlink(obj)) {
474                 if (S_ISDIR(mdd_object_type(obj))) {
475                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
476                                 return;
477
478                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
479                                 obj->mod_flags |= MNLINK_OBJ;
480                                 tmp_la->la_nlink = 1;
481                                 tmp_la->la_valid = LA_NLINK;
482                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
483                                                       0);
484                                 return;
485                         }
486                 }
487                 mdo_ref_add(env, obj, handle);
488         }
489 }
490
491 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
492                    struct thandle *handle, int is_dot)
493 {
494         if (!mdd_is_mnlink(obj) || is_dot)
495                 mdo_ref_del(env, obj, handle);
496 }
497
498 /* insert named index, add reference if isdir */
499 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
500                               const struct lu_fid *lf, const char *name, int is_dir,
501                               struct thandle *handle, struct lustre_capa *capa)
502 {
503         struct dt_object *next = mdd_object_child(pobj);
504         int               rc;
505         ENTRY;
506
507         if (dt_try_as_dir(env, next)) {
508                 struct md_ucred  *uc = md_ucred(env);
509
510                 rc = next->do_index_ops->dio_insert(env, next,
511                                                     __mdd_fid_rec(env, lf),
512                                                     (const struct dt_key *)name,
513                                                     handle, capa, uc->mu_cap &
514                                                     CFS_CAP_SYS_RESOURCE_MASK);
515         } else {
516                 rc = -ENOTDIR;
517         }
518
519         if (rc == 0) {
520                 if (is_dir) {
521                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
522                         __mdd_ref_add(env, pobj, handle);
523                         mdd_write_unlock(env, pobj);
524                 }
525         }
526         RETURN(rc);
527 }
528
529 /* delete named index, drop reference if isdir */
530 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
531                               const char *name, int is_dir, struct thandle *handle,
532                               struct lustre_capa *capa)
533 {
534         struct dt_object *next = mdd_object_child(pobj);
535         int               rc;
536         ENTRY;
537
538         if (dt_try_as_dir(env, next)) {
539                 rc = next->do_index_ops->dio_delete(env, next,
540                                                     (struct dt_key *)name,
541                                                     handle, capa);
542                 if (rc == 0 && is_dir) {
543                         int is_dot = 0;
544
545                         if (name != NULL && name[0] == '.' && name[1] == 0)
546                                 is_dot = 1;
547                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
548                         __mdd_ref_del(env, pobj, handle, is_dot);
549                         mdd_write_unlock(env, pobj);
550                 }
551         } else
552                 rc = -ENOTDIR;
553
554         RETURN(rc);
555 }
556
557 static int
558 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
559                         const struct lu_fid *lf, const char *name,
560                         struct thandle *handle, struct lustre_capa *capa)
561 {
562         struct dt_object *next = mdd_object_child(pobj);
563         int               rc;
564         ENTRY;
565
566         if (dt_try_as_dir(env, next)) {
567                 struct md_ucred  *uc = md_ucred(env);
568
569                 rc = next->do_index_ops->dio_insert(env, next,
570                                                     __mdd_fid_rec(env, lf),
571                                                     (const struct dt_key *)name,
572                                                     handle, capa, uc->mu_cap &
573                                                     CFS_CAP_SYS_RESOURCE_MASK);
574         } else {
575                 rc = -ENOTDIR;
576         }
577         RETURN(rc);
578 }
579
580 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
581                     struct md_object *src_obj, const struct lu_name *lname,
582                     struct md_attr *ma)
583 {
584         const char *name = lname->ln_name;
585         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
586         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
587         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
588         struct mdd_device *mdd = mdo2mdd(src_obj);
589         struct dynlock_handle *dlh;
590         struct thandle *handle;
591 #ifdef HAVE_QUOTA_SUPPORT
592         struct obd_device *obd = mdd->mdd_obd_dev;
593         struct mds_obd *mds = &obd->u.mds;
594         unsigned int qids[MAXQUOTAS] = { 0, 0 };
595         int quota_opc = 0, rec_pending = 0;
596 #endif
597         int rc;
598         ENTRY;
599
600 #ifdef HAVE_QUOTA_SUPPORT
601         if (mds->mds_quota) {
602                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
603
604                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
605                 if (!rc) {
606                         quota_opc = FSFILT_OP_LINK;
607                         mdd_quota_wrapper(la_tmp, qids);
608                         /* get block quota for parent */
609                         lquota_chkquota(mds_quota_interface_ref, obd,
610                                         qids[USRQUOTA], qids[GRPQUOTA], 1,
611                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK);
612                 }
613         }
614 #endif
615
616         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
617         handle = mdd_trans_start(env, mdd);
618         if (IS_ERR(handle))
619                 GOTO(out_pending, rc = PTR_ERR(handle));
620
621         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
622         if (dlh == NULL)
623                 GOTO(out_trans, rc = -ENOMEM);
624         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
625
626         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
627         if (rc)
628                 GOTO(out_unlock, rc);
629
630         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
631                                      name, handle,
632                                      mdd_object_capa(env, mdd_tobj));
633         if (rc)
634                 GOTO(out_unlock, rc);
635
636         __mdd_ref_add(env, mdd_sobj, handle);
637
638         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
639         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
640
641         la->la_valid = LA_CTIME | LA_MTIME;
642         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
643         if (rc)
644                 GOTO(out_unlock, rc);
645
646         la->la_valid = LA_CTIME;
647         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
648         EXIT;
649 out_unlock:
650         mdd_write_unlock(env, mdd_sobj);
651         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
652 out_trans:
653         mdd_trans_stop(env, mdd, rc, handle);
654 out_pending:
655 #ifdef HAVE_QUOTA_SUPPORT
656         if (quota_opc) {
657                 if (rec_pending)
658                         lquota_pending_commit(mds_quota_interface_ref, obd,
659                                               qids[USRQUOTA], qids[GRPQUOTA],
660                                               1, 1);
661                 /* Trigger dqacq for the parent owner. If failed,
662                  * the next call for lquota_chkquota will process it. */
663                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
664                               quota_opc);
665         }
666 #endif
667         return rc;
668 }
669
670 /* caller should take a lock before calling */
671 int mdd_finish_unlink(const struct lu_env *env,
672                       struct mdd_object *obj, struct md_attr *ma,
673                       struct thandle *th)
674 {
675         int rc;
676         int reset = 1;
677         ENTRY;
678
679         rc = mdd_iattr_get(env, obj, ma);
680         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
681                 /* add new orphan and the object
682                  * will be deleted during mdd_close() */
683                 if (obj->mod_count) {
684                         rc = __mdd_orphan_add(env, obj, th);
685                         if (rc == 0)
686                                 obj->mod_flags |= ORPHAN_OBJ;
687                 }
688
689                 obj->mod_flags |= DEAD_OBJ;
690                 if (!(obj->mod_flags & ORPHAN_OBJ)) {
691                         rc = mdd_object_kill(env, obj, ma);
692                         if (rc == 0)
693                                 reset = 0;
694                 }
695
696         }
697         if (reset)
698                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
699
700         RETURN(rc);
701 }
702
703 /*
704  * pobj maybe NULL
705  * has mdd_write_lock on cobj already, but not on pobj yet
706  */
707 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
708                             struct mdd_object *cobj, struct md_attr *ma)
709 {
710         int rc;
711         ENTRY;
712
713         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
714
715         RETURN(rc);
716 }
717
718 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
719                       struct md_object *cobj, const struct lu_name *lname,
720                       struct md_attr *ma)
721 {
722         const char *name = lname->ln_name;
723         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
724         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
725         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
726         struct mdd_device *mdd = mdo2mdd(pobj);
727         struct dynlock_handle *dlh;
728         struct thandle    *handle;
729 #ifdef HAVE_QUOTA_SUPPORT
730         struct obd_device *obd = mdd->mdd_obd_dev;
731         struct mds_obd *mds = &obd->u.mds;
732         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
733         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
734         int quota_opc = 0;
735 #endif
736         int rc, is_dir;
737         ENTRY;
738
739         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
740                  PFID(mdd_object_fid(mdd_cobj)));
741
742         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
743         if (rc)
744                 RETURN(rc);
745
746         handle = mdd_trans_start(env, mdd);
747         if (IS_ERR(handle))
748                 RETURN(PTR_ERR(handle));
749
750
751         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
752         if (dlh == NULL)
753                 GOTO(out_trans, rc = -ENOMEM);
754         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
755
756         is_dir = S_ISDIR(ma->ma_attr.la_mode);
757         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
758         if (rc)
759                 GOTO(cleanup, rc);
760
761         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
762                                 mdd_object_capa(env, mdd_pobj));
763         if (rc)
764                 GOTO(cleanup, rc);
765
766         __mdd_ref_del(env, mdd_cobj, handle, 0);
767         if (is_dir)
768                 /* unlink dot */
769                 __mdd_ref_del(env, mdd_cobj, handle, 1);
770
771         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
772         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
773
774         la->la_valid = LA_CTIME | LA_MTIME;
775         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
776         if (rc)
777                 GOTO(cleanup, rc);
778
779         la->la_valid = LA_CTIME;
780         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
781         if (rc)
782                 GOTO(cleanup, rc);
783
784         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
785 #ifdef HAVE_QUOTA_SUPPORT
786         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
787             ma->ma_attr.la_nlink == 0) {
788                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
789
790                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
791                 if (!rc) {
792                         mdd_quota_wrapper(la_tmp, qpids);
793                         if (mdd_cobj->mod_count == 0) {
794                                 quota_opc = FSFILT_OP_UNLINK;
795                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
796                         } else {
797                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
798                         }
799                 }
800         }
801 #endif
802
803         if (rc == 0)
804                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
805                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
806                                    NULL, NULL);
807         EXIT;
808 cleanup:
809         mdd_write_unlock(env, mdd_cobj);
810         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
811 out_trans:
812         mdd_trans_stop(env, mdd, rc, handle);
813 #ifdef HAVE_QUOTA_SUPPORT
814         if (quota_opc)
815                 /* Trigger dqrel on the owner of child and parent. If failed,
816                  * the next call for lquota_chkquota will process it. */
817                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
818                               quota_opc);
819 #endif
820         return rc;
821 }
822
823 /* has not lock on pobj yet */
824 static int mdd_ni_sanity_check(const struct lu_env *env,
825                                struct md_object *pobj,
826                                const struct md_attr *ma)
827 {
828         struct mdd_object *obj = md2mdd_obj(pobj);
829         int rc;
830         ENTRY;
831
832         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
833                 RETURN(0);
834
835         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
836
837         RETURN(rc);
838 }
839
840 /*
841  * Partial operation.
842  */
843 static int mdd_name_insert(const struct lu_env *env,
844                            struct md_object *pobj,
845                            const struct lu_name *lname,
846                            const struct lu_fid *fid,
847                            const struct md_attr *ma)
848 {
849         const char *name = lname->ln_name;
850         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
851         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
852         struct mdd_device *mdd = mdo2mdd(pobj);
853         struct dynlock_handle *dlh;
854         struct thandle *handle;
855         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
856 #ifdef HAVE_QUOTA_SUPPORT
857         struct md_ucred *uc = md_ucred(env);
858         struct obd_device *obd = mdd->mdd_obd_dev;
859         struct mds_obd *mds = &obd->u.mds;
860         unsigned int qids[MAXQUOTAS] = { 0, 0 };
861         int quota_opc = 0, rec_pending = 0;
862         cfs_cap_t save = uc->mu_cap;
863 #endif
864         int rc;
865         ENTRY;
866
867 #ifdef HAVE_QUOTA_SUPPORT
868         if (mds->mds_quota) {
869                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
870                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
871
872                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
873                         if (!rc) {
874                                 quota_opc = FSFILT_OP_LINK;
875                                 mdd_quota_wrapper(la_tmp, qids);
876                                 /* get block quota for parent */
877                                 lquota_chkquota(mds_quota_interface_ref, obd,
878                                                 qids[USRQUOTA], qids[GRPQUOTA],
879                                                 1, &rec_pending, NULL,
880                                                 LQUOTA_FLAGS_BLK);
881                         }
882                 } else {
883                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
884                 }
885         }
886 #endif
887         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
888         handle = mdd_trans_start(env, mdo2mdd(pobj));
889         if (IS_ERR(handle))
890                 GOTO(out_pending, rc = PTR_ERR(handle));
891
892         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
893         if (dlh == NULL)
894                 GOTO(out_trans, rc = -ENOMEM);
895
896         rc = mdd_ni_sanity_check(env, pobj, ma);
897         if (rc)
898                 GOTO(out_unlock, rc);
899
900         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
901                                 handle, BYPASS_CAPA);
902         if (rc)
903                 GOTO(out_unlock, rc);
904
905         /*
906          * For some case, no need update obj's ctime (LA_CTIME is not set),
907          * e.g. split_dir.
908          * For other cases, update obj's ctime (LA_CTIME is set),
909          * e.g. cmr_link.
910          */
911         if (ma->ma_attr.la_valid & LA_CTIME) {
912                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
913                 la->la_valid = LA_CTIME | LA_MTIME;
914                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
915                                                         handle, 0);
916         }
917         EXIT;
918 out_unlock:
919         mdd_pdo_write_unlock(env, mdd_obj, dlh);
920 out_trans:
921         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
922 out_pending:
923 #ifdef HAVE_QUOTA_SUPPORT
924         if (mds->mds_quota) {
925                 if (quota_opc) {
926                         if (rec_pending)
927                                 lquota_pending_commit(mds_quota_interface_ref,
928                                                       obd, qids[USRQUOTA],
929                                                       qids[GRPQUOTA], 1, 1);
930                         /* Trigger dqacq for the parent owner. If failed,
931                          * the next call for lquota_chkquota will process it*/
932                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
933                                       rc, quota_opc);
934                 } else {
935                         uc->mu_cap = save;
936                 }
937         }
938 #endif
939         return rc;
940 }
941
942 /* has not lock on pobj yet */
943 static int mdd_nr_sanity_check(const struct lu_env *env,
944                                struct md_object *pobj,
945                                const struct md_attr *ma)
946 {
947         struct mdd_object *obj = md2mdd_obj(pobj);
948         int rc;
949         ENTRY;
950
951         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
952                 RETURN(0);
953
954         rc = mdd_may_unlink(env, obj, ma);
955
956         RETURN(rc);
957 }
958
959 /*
960  * Partial operation.
961  */
962 static int mdd_name_remove(const struct lu_env *env,
963                            struct md_object *pobj,
964                            const struct lu_name *lname,
965                            const struct md_attr *ma)
966 {
967         const char *name = lname->ln_name;
968         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
969         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
970         struct mdd_device *mdd = mdo2mdd(pobj);
971         struct dynlock_handle *dlh;
972         struct thandle *handle;
973         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
974 #ifdef HAVE_QUOTA_SUPPORT
975         struct obd_device *obd = mdd->mdd_obd_dev;
976         struct mds_obd *mds = &obd->u.mds;
977         unsigned int qids[MAXQUOTAS] = { 0, 0 };
978         int quota_opc = 0;
979 #endif
980         int rc;
981         ENTRY;
982
983 #ifdef HAVE_QUOTA_SUPPORT
984         if (mds->mds_quota) {
985                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
986
987                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
988                 if (!rc) {
989                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
990                         mdd_quota_wrapper(la_tmp, qids);
991                 }
992         }
993 #endif
994         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
995         handle = mdd_trans_start(env, mdd);
996         if (IS_ERR(handle))
997                 GOTO(out_pending, rc = PTR_ERR(handle));
998
999         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1000         if (dlh == NULL)
1001                 GOTO(out_trans, rc = -ENOMEM);
1002
1003         rc = mdd_nr_sanity_check(env, pobj, ma);
1004         if (rc)
1005                 GOTO(out_unlock, rc);
1006
1007         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1008                                 handle, BYPASS_CAPA);
1009         if (rc)
1010                 GOTO(out_unlock, rc);
1011
1012         /*
1013          * For some case, no need update obj's ctime (LA_CTIME is not set),
1014          * e.g. split_dir.
1015          * For other cases, update obj's ctime (LA_CTIME is set),
1016          * e.g. cmr_unlink.
1017          */
1018         if (ma->ma_attr.la_valid & LA_CTIME) {
1019                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1020                 la->la_valid = LA_CTIME | LA_MTIME;
1021                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1022                                                         handle, 0);
1023         }
1024         EXIT;
1025 out_unlock:
1026         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1027 out_trans:
1028         mdd_trans_stop(env, mdd, rc, handle);
1029 out_pending:
1030 #ifdef HAVE_QUOTA_SUPPORT
1031         /* Trigger dqrel for the parent owner.
1032          * If failed, the next call for lquota_chkquota will process it. */
1033         if (quota_opc)
1034                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1035                               quota_opc);
1036 #endif
1037         return rc;
1038 }
1039
1040 /*
1041  * tobj maybe NULL
1042  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1043  */
1044 static int mdd_rt_sanity_check(const struct lu_env *env,
1045                                struct mdd_object *tgt_pobj,
1046                                struct mdd_object *tobj,
1047                                struct md_attr *ma)
1048 {
1049         int rc;
1050         ENTRY;
1051
1052         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1053                 RETURN(0);
1054
1055         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1056          * exist. In fact, tobj must exist, otherwise the call trace will be:
1057          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1058          * When get here, tobj must be NOT NULL, the other case has been
1059          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1060          * MDS_PERM_BYPASS.
1061          * So check may_delete, but not check nlink of tgt_pobj. */
1062         LASSERT(tobj);
1063         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1064
1065         RETURN(rc);
1066 }
1067
1068 static int mdd_rename_tgt(const struct lu_env *env,
1069                           struct md_object *pobj, struct md_object *tobj,
1070                           const struct lu_fid *lf, const struct lu_name *lname,
1071                           struct md_attr *ma)
1072 {
1073         const char *name = lname->ln_name;
1074         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1075         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1076         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1077         struct mdd_device *mdd = mdo2mdd(pobj);
1078         struct dynlock_handle *dlh;
1079         struct thandle *handle;
1080 #ifdef HAVE_QUOTA_SUPPORT
1081         struct obd_device *obd = mdd->mdd_obd_dev;
1082         struct mds_obd *mds = &obd->u.mds;
1083         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1084         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1085         int quota_opc = 0, rec_pending = 0;
1086 #endif
1087         int rc;
1088         ENTRY;
1089
1090 #ifdef HAVE_QUOTA_SUPPORT
1091         if (mds->mds_quota && !tobj) {
1092                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1093
1094                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1095                 if (!rc) {
1096                         quota_opc = FSFILT_OP_LINK;
1097                         mdd_quota_wrapper(la_tmp, qpids);
1098                         /* get block quota for target parent */
1099                         lquota_chkquota(mds_quota_interface_ref, obd,
1100                                         qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1101                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK);
1102                 }
1103         }
1104 #endif
1105         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
1106         handle = mdd_trans_start(env, mdd);
1107         if (IS_ERR(handle))
1108                 GOTO(out_pending, rc = PTR_ERR(handle));
1109
1110         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1111         if (dlh == NULL)
1112                 GOTO(out_trans, rc = -ENOMEM);
1113         if (tobj)
1114                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1115
1116         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1117         if (rc)
1118                 GOTO(cleanup, rc);
1119
1120         /*
1121          * If rename_tgt is called then we should just re-insert name with
1122          * correct fid, no need to dec/inc parent nlink if obj is dir.
1123          */
1124         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1125         if (rc)
1126                 GOTO(cleanup, rc);
1127
1128         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1129                                      BYPASS_CAPA);
1130         if (rc)
1131                 GOTO(cleanup, rc);
1132
1133         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1134         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1135
1136         la->la_valid = LA_CTIME | LA_MTIME;
1137         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1138         if (rc)
1139                 GOTO(cleanup, rc);
1140
1141         /*
1142          * For tobj is remote case cmm layer has processed
1143          * and pass NULL tobj to here. So when tobj is NOT NULL,
1144          * it must be local one.
1145          */
1146         if (tobj && mdd_object_exists(mdd_tobj)) {
1147                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1148
1149                 /* Remove dot reference. */
1150                 if (S_ISDIR(ma->ma_attr.la_mode))
1151                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1152
1153                 la->la_valid = LA_CTIME;
1154                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1155                 if (rc)
1156                         GOTO(cleanup, rc);
1157
1158                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1159                 if (rc)
1160                         GOTO(cleanup, rc);
1161
1162 #ifdef HAVE_QUOTA_SUPPORT
1163                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1164                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1165                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1166                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1167                 }
1168 #endif
1169         }
1170         EXIT;
1171 cleanup:
1172         if (tobj)
1173                 mdd_write_unlock(env, mdd_tobj);
1174         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1175 out_trans:
1176         mdd_trans_stop(env, mdd, rc, handle);
1177 out_pending:
1178 #ifdef HAVE_QUOTA_SUPPORT
1179         if (mds->mds_quota) {
1180                 if (rec_pending)
1181                         lquota_pending_commit(mds_quota_interface_ref, obd,
1182                                               qpids[USRQUOTA],
1183                                               qpids[GRPQUOTA],
1184                                               1, 1);
1185                 if (quota_opc)
1186                         /* Trigger dqrel/dqacq on the target owner of child and
1187                          * parent. If failed, the next call for lquota_chkquota
1188                          * will process it. */
1189                         lquota_adjust(mds_quota_interface_ref, obd, qcids,
1190                                       qpids, rc, quota_opc);
1191         }
1192 #endif
1193         return rc;
1194 }
1195
1196 /*
1197  * The permission has been checked when obj created, no need check again.
1198  */
1199 static int mdd_cd_sanity_check(const struct lu_env *env,
1200                                struct mdd_object *obj)
1201 {
1202         ENTRY;
1203
1204         /* EEXIST check */
1205         if (!obj || mdd_is_dead_obj(obj))
1206                 RETURN(-ENOENT);
1207
1208         RETURN(0);
1209
1210 }
1211
1212 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1213                            struct md_object *cobj, const struct md_op_spec *spec,
1214                            struct md_attr *ma)
1215 {
1216         struct mdd_device *mdd = mdo2mdd(cobj);
1217         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1218         struct mdd_object *son = md2mdd_obj(cobj);
1219         struct lu_attr    *attr = &ma->ma_attr;
1220         struct lov_mds_md *lmm = NULL;
1221         int                lmm_size = 0;
1222         struct thandle    *handle;
1223         int                rc;
1224         ENTRY;
1225
1226         rc = mdd_cd_sanity_check(env, son);
1227         if (rc)
1228                 RETURN(rc);
1229
1230         if (!md_should_create(spec->sp_cr_flags))
1231                 RETURN(0);
1232
1233         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1234                             spec, attr);
1235         if (rc)
1236                 RETURN(rc);
1237
1238         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1239         handle = mdd_trans_start(env, mdd);
1240         if (IS_ERR(handle))
1241                 GOTO(out_free, rc = PTR_ERR(handle));
1242
1243         /*
1244          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1245          * Should this be fixed?
1246          */
1247
1248         /* Replay creates has objects already */
1249 #if 0
1250         if (spec->no_create) {
1251                 CDEBUG(D_INFO, "we already have lov ea\n");
1252                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1253                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1254                                     spec->u.sp_ea.eadatalen, handle, 0);
1255         } else
1256 #endif
1257                 /* No need mdd_lsm_sanity_check here */
1258                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1259                                     lmm_size, handle, 0);
1260
1261         if (rc == 0)
1262                rc = mdd_attr_get_internal_locked(env, son, ma);
1263
1264         /* update lov_objid data, must be before transaction stop! */
1265         if (rc == 0)
1266                 mdd_lov_objid_update(mdd, lmm);
1267
1268         mdd_trans_stop(env, mdd, rc, handle);
1269 out_free:
1270         /* Finish mdd_lov_create() stuff. */
1271         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1272         RETURN(rc);
1273 }
1274
1275 static int
1276 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1277              const struct lu_name *lname, struct lu_fid* fid, int mask)
1278 {
1279         const char          *name = lname->ln_name;
1280         const struct dt_key *key = (const struct dt_key *)name;
1281         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1282         struct mdd_device   *m = mdo2mdd(pobj);
1283         struct dt_object    *dir = mdd_object_child(mdd_obj);
1284         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1285         int rc;
1286         ENTRY;
1287
1288         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1289                 RETURN(-ESTALE);
1290
1291         rc = mdd_object_exists(mdd_obj);
1292         if (unlikely(rc == 0))
1293                 RETURN(-ESTALE);
1294         else if (unlikely(rc < 0)) {
1295                 CERROR("Object "DFID" locates on remote server\n",
1296                         PFID(mdo2fid(mdd_obj)));
1297                 LBUG();
1298         }
1299
1300         /* The common filename length check. */
1301         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1302                 RETURN(-ENAMETOOLONG);
1303
1304         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1305                                             MOR_TGT_PARENT);
1306         if (rc)
1307                 RETURN(rc);
1308
1309         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1310                    dt_try_as_dir(env, dir))) {
1311                 rc = dir->do_index_ops->dio_lookup(env, dir,
1312                                                  (struct dt_rec *)pack, key,
1313                                                  mdd_object_capa(env, mdd_obj));
1314                 if (rc > 0)
1315                         rc = fid_unpack(pack, fid);
1316                 else if (rc == 0)
1317                         rc = -ENOENT;
1318         } else
1319                 rc = -ENOTDIR;
1320
1321         RETURN(rc);
1322 }
1323
1324 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1325                           struct mdd_object *child, struct md_attr *ma,
1326                           struct thandle *handle, const struct md_op_spec *spec)
1327 {
1328         int rc;
1329         ENTRY;
1330
1331         /*
1332          * Update attributes for child.
1333          *
1334          * FIXME:
1335          *  (1) the valid bits should be converted between Lustre and Linux;
1336          *  (2) maybe, the child attributes should be set in OSD when creation.
1337          */
1338
1339         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1340         if (rc != 0)
1341                 RETURN(rc);
1342
1343         if (S_ISDIR(ma->ma_attr.la_mode)) {
1344                 /* Add "." and ".." for newly created dir */
1345                 __mdd_ref_add(env, child, handle);
1346                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1347                                              dot, handle, BYPASS_CAPA);
1348                 if (rc == 0) {
1349                         rc = __mdd_index_insert_only(env, child, pfid,
1350                                                      dotdot, handle,
1351                                                      BYPASS_CAPA);
1352                         if (rc != 0) {
1353                                 int rc2;
1354
1355                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1356                                                          handle, BYPASS_CAPA);
1357                                 if (rc2 != 0)
1358                                         CERROR("Failure to cleanup after dotdot"
1359                                                " creation: %d (%d)\n", rc2, rc);
1360                         }
1361                 }
1362         }
1363         RETURN(rc);
1364 }
1365
1366 /* has not lock on pobj yet */
1367 static int mdd_create_sanity_check(const struct lu_env *env,
1368                                    struct md_object *pobj,
1369                                    const struct lu_name *lname,
1370                                    struct md_attr *ma,
1371                                    struct md_op_spec *spec)
1372 {
1373         struct mdd_thread_info *info = mdd_env_info(env);
1374         struct lu_attr    *la        = &info->mti_la;
1375         struct lu_fid     *fid       = &info->mti_fid;
1376         struct mdd_object *obj       = md2mdd_obj(pobj);
1377         struct mdd_device *m         = mdo2mdd(pobj);
1378         int lookup                   = spec->sp_cr_lookup;
1379         int rc;
1380         ENTRY;
1381
1382         /* EEXIST check */
1383         if (mdd_is_dead_obj(obj))
1384                 RETURN(-ENOENT);
1385
1386         /*
1387          * In some cases this lookup is not needed - we know before if name
1388          * exists or not because MDT performs lookup for it.
1389          * name length check is done in lookup.
1390          */
1391         if (lookup) {
1392                 /*
1393                  * Check if the name already exist, though it will be checked in
1394                  * _index_insert also, for avoiding rolling back if exists
1395                  * _index_insert.
1396                  */
1397                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1398                                          MAY_WRITE | MAY_EXEC);
1399                 if (rc != -ENOENT)
1400                         RETURN(rc ? : -EEXIST);
1401         } else {
1402                 /*
1403                  * Check WRITE permission for the parent.
1404                  * EXEC permission have been checked
1405                  * when lookup before create already.
1406                  */
1407                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1408                                                     MOR_TGT_PARENT);
1409                 if (rc)
1410                         RETURN(rc);
1411         }
1412
1413         /* sgid check */
1414         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1415         if (rc != 0)
1416                 RETURN(rc);
1417
1418         if (la->la_mode & S_ISGID) {
1419                 ma->ma_attr.la_gid = la->la_gid;
1420                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1421                         ma->ma_attr.la_mode |= S_ISGID;
1422                         ma->ma_attr.la_valid |= LA_MODE;
1423                 }
1424         }
1425
1426         switch (ma->ma_attr.la_mode & S_IFMT) {
1427         case S_IFLNK: {
1428                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1429
1430                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1431                         RETURN(-ENAMETOOLONG);
1432                 else
1433                         RETURN(0);
1434         }
1435         case S_IFDIR:
1436         case S_IFREG:
1437         case S_IFCHR:
1438         case S_IFBLK:
1439         case S_IFIFO:
1440         case S_IFSOCK:
1441                 rc = 0;
1442                 break;
1443         default:
1444                 rc = -EINVAL;
1445                 break;
1446         }
1447         RETURN(rc);
1448 }
1449
1450 /*
1451  * Create object and insert it into namespace.
1452  */
1453 static int mdd_create(const struct lu_env *env,
1454                       struct md_object *pobj,
1455                       const struct lu_name *lname,
1456                       struct md_object *child,
1457                       struct md_op_spec *spec,
1458                       struct md_attr* ma)
1459 {
1460         struct mdd_thread_info *info = mdd_env_info(env);
1461         struct lu_attr         *la = &info->mti_la_for_fix;
1462         struct md_attr         *ma_acl = &info->mti_ma;
1463         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1464         struct mdd_object      *son = md2mdd_obj(child);
1465         struct mdd_device      *mdd = mdo2mdd(pobj);
1466         struct lu_attr         *attr = &ma->ma_attr;
1467         struct lov_mds_md      *lmm = NULL;
1468         struct thandle         *handle;
1469         struct dynlock_handle  *dlh;
1470         const char             *name = lname->ln_name;
1471         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1472         int got_def_acl = 0;
1473 #ifdef HAVE_QUOTA_SUPPORT
1474         struct obd_device *obd = mdd->mdd_obd_dev;
1475         struct mds_obd *mds = &obd->u.mds;
1476         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1477         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1478         int quota_opc = 0, block_count = 0;
1479         int inode_pending = 0, block_pending = 0, parent_pending = 0;
1480 #endif
1481         ENTRY;
1482
1483         /*
1484          * Two operations have to be performed:
1485          *
1486          *  - an allocation of a new object (->do_create()), and
1487          *
1488          *  - an insertion into a parent index (->dio_insert()).
1489          *
1490          * Due to locking, operation order is not important, when both are
1491          * successful, *but* error handling cases are quite different:
1492          *
1493          *  - if insertion is done first, and following object creation fails,
1494          *  insertion has to be rolled back, but this operation might fail
1495          *  also leaving us with dangling index entry.
1496          *
1497          *  - if creation is done first, is has to be undone if insertion
1498          *  fails, leaving us with leaked space, which is neither good, nor
1499          *  fatal.
1500          *
1501          * It seems that creation-first is simplest solution, but it is
1502          * sub-optimal in the frequent
1503          *
1504          *         $ mkdir foo
1505          *         $ mkdir foo
1506          *
1507          * case, because second mkdir is bound to create object, only to
1508          * destroy it immediately.
1509          *
1510          * To avoid this follow local file systems that do double lookup:
1511          *
1512          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1513          *
1514          *     1. create            (mdd_object_create_internal())
1515          *
1516          *     2. insert            (__mdd_index_insert(), lookup again)
1517          */
1518
1519         /* Sanity checks before big job. */
1520         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1521         if (rc)
1522                 RETURN(rc);
1523
1524 #ifdef HAVE_QUOTA_SUPPORT
1525         if (mds->mds_quota) {
1526                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1527
1528                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1529                 if (!rc) {
1530                         int same = 0;
1531
1532                         quota_opc = FSFILT_OP_CREATE;
1533                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1534                         mdd_quota_wrapper(la_tmp, qpids);
1535                         /* get file quota for child */
1536                         lquota_chkquota(mds_quota_interface_ref, obd,
1537                                         qcids[USRQUOTA], qcids[GRPQUOTA], 1,
1538                                         &inode_pending, NULL, 0);
1539                         switch (ma->ma_attr.la_mode & S_IFMT) {
1540                         case S_IFLNK:
1541                         case S_IFDIR:
1542                                 block_count = 2;
1543                                 break;
1544                         case S_IFREG:
1545                                 block_count = 1;
1546                                 break;
1547                         }
1548                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1549                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1550                                 block_count += 1;
1551                                 same = 1;
1552                         }
1553                         /* get block quota for child and parent */
1554                         if (block_count)
1555                                 lquota_chkquota(mds_quota_interface_ref, obd,
1556                                                 qcids[USRQUOTA], qcids[GRPQUOTA],
1557                                                 block_count,
1558                                                 &block_pending, NULL,
1559                                                 LQUOTA_FLAGS_BLK);
1560                         if (!same)
1561                                 lquota_chkquota(mds_quota_interface_ref, obd,
1562                                                 qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1563                                                 &parent_pending, NULL,
1564                                                 LQUOTA_FLAGS_BLK);
1565                 }
1566         }
1567 #endif
1568
1569         /*
1570          * No RPC inside the transaction, so OST objects should be created at
1571          * first.
1572          */
1573         if (S_ISREG(attr->la_mode)) {
1574                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1575                                     spec, attr);
1576                 if (rc)
1577                         GOTO(out_pending, rc);
1578         }
1579
1580         if (!S_ISLNK(attr->la_mode)) {
1581                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1582                 ma_acl->ma_acl = info->mti_xattr_buf;
1583                 ma_acl->ma_need = MA_ACL_DEF;
1584                 ma_acl->ma_valid = 0;
1585
1586                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1587                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1588                 mdd_read_unlock(env, mdd_pobj);
1589                 if (rc)
1590                         GOTO(out_free, rc);
1591                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1592                         got_def_acl = 1;
1593         }
1594
1595         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1596         handle = mdd_trans_start(env, mdd);
1597         if (IS_ERR(handle))
1598                 GOTO(out_free, rc = PTR_ERR(handle));
1599
1600         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1601         if (dlh == NULL)
1602                 GOTO(out_trans, rc = -ENOMEM);
1603
1604         mdd_write_lock(env, son, MOR_TGT_CHILD);
1605         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
1606         if (rc) {
1607                 mdd_write_unlock(env, son);
1608                 GOTO(cleanup, rc);
1609         }
1610
1611         created = 1;
1612
1613 #ifdef CONFIG_FS_POSIX_ACL
1614         if (got_def_acl) {
1615                 struct lu_buf *acl_buf = &info->mti_buf;
1616                 acl_buf->lb_buf = ma_acl->ma_acl;
1617                 acl_buf->lb_len = ma_acl->ma_acl_size;
1618
1619                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1620                 if (rc) {
1621                         mdd_write_unlock(env, son);
1622                         GOTO(cleanup, rc);
1623                 } else {
1624                         ma->ma_attr.la_valid |= LA_MODE;
1625                 }
1626         }
1627 #endif
1628
1629         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1630                                    son, ma, handle, spec);
1631         mdd_write_unlock(env, son);
1632         if (rc)
1633                 /*
1634                  * Object has no links, so it will be destroyed when last
1635                  * reference is released. (XXX not now.)
1636                  */
1637                 GOTO(cleanup, rc);
1638
1639         initialized = 1;
1640
1641         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1642                                 name, S_ISDIR(attr->la_mode), handle,
1643                                 mdd_object_capa(env, mdd_pobj));
1644
1645         if (rc)
1646                 GOTO(cleanup, rc);
1647
1648         inserted = 1;
1649
1650         /* No need mdd_lsm_sanity_check here */
1651         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1652         if (rc) {
1653                 CERROR("error on stripe info copy %d \n", rc);
1654                 GOTO(cleanup, rc);
1655         }
1656         if (lmm && lmm_size > 0) {
1657                 /* Set Lov here, do not get lmm again later */
1658                 memcpy(ma->ma_lmm, lmm, lmm_size);
1659                 ma->ma_lmm_size = lmm_size;
1660                 ma->ma_valid |= MA_LOV;
1661         }
1662
1663         if (S_ISLNK(attr->la_mode)) {
1664                 struct md_ucred  *uc = md_ucred(env);
1665                 struct dt_object *dt = mdd_object_child(son);
1666                 const char *target_name = spec->u.sp_symname;
1667                 int sym_len = strlen(target_name);
1668                 const struct lu_buf *buf;
1669                 loff_t pos = 0;
1670
1671                 buf = mdd_buf_get_const(env, target_name, sym_len);
1672                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1673                                                 mdd_object_capa(env, son),
1674                                                 uc->mu_cap &
1675                                                 CFS_CAP_SYS_RESOURCE_MASK);
1676
1677                 if (rc == sym_len)
1678                         rc = 0;
1679                 else
1680                         GOTO(cleanup, rc = -EFAULT);
1681         }
1682
1683         *la = ma->ma_attr;
1684         la->la_valid = LA_CTIME | LA_MTIME;
1685         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1686         if (rc)
1687                 GOTO(cleanup, rc);
1688
1689         /* Return attr back. */
1690         rc = mdd_attr_get_internal_locked(env, son, ma);
1691         EXIT;
1692 cleanup:
1693         if (rc && created) {
1694                 int rc2 = 0;
1695
1696                 if (inserted) {
1697                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1698                                                  S_ISDIR(attr->la_mode),
1699                                                  handle, BYPASS_CAPA);
1700                         if (rc2)
1701                                 CERROR("error can not cleanup destroy %d\n",
1702                                        rc2);
1703                 }
1704
1705                 if (rc2 == 0) {
1706                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1707                         __mdd_ref_del(env, son, handle, 0);
1708                         if (initialized && S_ISDIR(attr->la_mode))
1709                                 __mdd_ref_del(env, son, handle, 1);
1710                         mdd_write_unlock(env, son);
1711                 }
1712         }
1713
1714         /* update lov_objid data, must be before transaction stop! */
1715         if (rc == 0)
1716                 mdd_lov_objid_update(mdd, lmm);
1717
1718         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1719 out_trans:
1720         mdd_trans_stop(env, mdd, rc, handle);
1721 out_free:
1722         /* finis lov_create stuff, free all temporary data */
1723         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1724 out_pending:
1725 #ifdef HAVE_QUOTA_SUPPORT
1726         if (quota_opc) {
1727                 if (inode_pending)
1728                         lquota_pending_commit(mds_quota_interface_ref, obd,
1729                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1730                                               1, 0);
1731                 if (block_pending)
1732                         lquota_pending_commit(mds_quota_interface_ref, obd,
1733                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1734                                               block_count, 1);
1735                 if (parent_pending)
1736                         lquota_pending_commit(mds_quota_interface_ref, obd,
1737                                               qpids[USRQUOTA], qpids[GRPQUOTA],
1738                                               1, 1);
1739                 /* Trigger dqacq on the owner of child and parent. If failed,
1740                  * the next call for lquota_chkquota will process it. */
1741                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1742                               quota_opc);
1743         }
1744 #endif
1745         return rc;
1746 }
1747
1748 /*
1749  * Get locks on parents in proper order
1750  * RETURN: < 0 - error, rename_order if successful
1751  */
1752 enum rename_order {
1753         MDD_RN_SAME,
1754         MDD_RN_SRCTGT,
1755         MDD_RN_TGTSRC
1756 };
1757
1758 static int mdd_rename_order(const struct lu_env *env,
1759                             struct mdd_device *mdd,
1760                             struct mdd_object *src_pobj,
1761                             struct mdd_object *tgt_pobj)
1762 {
1763         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1764         int rc;
1765         ENTRY;
1766
1767         if (src_pobj == tgt_pobj)
1768                 RETURN(MDD_RN_SAME);
1769
1770         /* compared the parent child relationship of src_p&tgt_p */
1771         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1772                 rc = MDD_RN_SRCTGT;
1773         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1774                 rc = MDD_RN_TGTSRC;
1775         } else {
1776                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1777                 if (rc == -EREMOTE)
1778                         rc = 0;
1779
1780                 if (rc == 1)
1781                         rc = MDD_RN_TGTSRC;
1782                 else if (rc == 0)
1783                         rc = MDD_RN_SRCTGT;
1784         }
1785
1786         RETURN(rc);
1787 }
1788
1789 /* has not mdd_write{read}_lock on any obj yet. */
1790 static int mdd_rename_sanity_check(const struct lu_env *env,
1791                                    struct mdd_object *src_pobj,
1792                                    struct mdd_object *tgt_pobj,
1793                                    struct mdd_object *sobj,
1794                                    struct mdd_object *tobj,
1795                                    struct md_attr *ma)
1796 {
1797         int rc = 0;
1798         ENTRY;
1799
1800         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1801                 RETURN(0);
1802
1803         /* XXX: when get here, sobj must NOT be NULL,
1804          * the other case has been processed in cml_rename
1805          * before mdd_rename and enable MDS_PERM_BYPASS. */
1806         LASSERT(sobj);
1807         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1808         if (rc)
1809                 RETURN(rc);
1810
1811         /* XXX: when get here, "tobj == NULL" means tobj must
1812          * NOT exist (neither on remote MDS, such case has been
1813          * processed in cml_rename before mdd_rename and enable
1814          * MDS_PERM_BYPASS).
1815          * So check may_create, but not check may_unlink. */
1816         if (!tobj)
1817                 rc = mdd_may_create(env, tgt_pobj, NULL,
1818                                     (src_pobj != tgt_pobj), 0);
1819         else
1820                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1821                                     (src_pobj != tgt_pobj), 1);
1822
1823         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1824             S_ISDIR(ma->ma_attr.la_mode))
1825                 rc = __mdd_may_link(env, tgt_pobj);
1826
1827         RETURN(rc);
1828 }
1829
1830 /* src object can be remote that is why we use only fid and type of object */
1831 static int mdd_rename(const struct lu_env *env,
1832                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1833                       const struct lu_fid *lf, const struct lu_name *lsname,
1834                       struct md_object *tobj, const struct lu_name *ltname,
1835                       struct md_attr *ma)
1836 {
1837         const char *sname = lsname->ln_name;
1838         const char *tname = ltname->ln_name;
1839         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1840         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1841         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1842         struct mdd_device *mdd = mdo2mdd(src_pobj);
1843         struct mdd_object *mdd_sobj = NULL;
1844         struct mdd_object *mdd_tobj = NULL;
1845         struct dynlock_handle *sdlh, *tdlh;
1846         struct thandle *handle;
1847         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
1848         int is_dir;
1849         int rc;
1850
1851 #ifdef HAVE_QUOTA_SUPPORT
1852         struct obd_device *obd = mdd->mdd_obd_dev;
1853         struct mds_obd *mds = &obd->u.mds;
1854         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
1855         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
1856         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
1857         int quota_opc = 0, rec_pending = 0;
1858 #endif
1859         ENTRY;
1860
1861         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1862         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1863
1864         if (tobj)
1865                 mdd_tobj = md2mdd_obj(tobj);
1866
1867 #ifdef HAVE_QUOTA_SUPPORT
1868         if (mds->mds_quota) {
1869                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1870
1871                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
1872                 if (!rc) {
1873                         mdd_quota_wrapper(la_tmp, qspids);
1874                         if (!tobj) {
1875                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
1876                                                 BYPASS_CAPA);
1877                                 if (!rc) {
1878                                         quota_opc = FSFILT_OP_LINK;
1879                                         mdd_quota_wrapper(la_tmp, qtpids);
1880                                         /* get block quota for target parent */
1881                                         lquota_chkquota(mds_quota_interface_ref,
1882                                                         obd, qtpids[USRQUOTA],
1883                                                         qtpids[GRPQUOTA], 1,
1884                                                         &rec_pending, NULL,
1885                                                         LQUOTA_FLAGS_BLK);
1886                                 }
1887                         }
1888                 }
1889         }
1890 #endif
1891         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1892         handle = mdd_trans_start(env, mdd);
1893         if (IS_ERR(handle))
1894                 GOTO(out_pending, rc = PTR_ERR(handle));
1895
1896         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1897         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1898         if (rc < 0)
1899                 GOTO(cleanup_unlocked, rc);
1900
1901         /* Get locks in determined order */
1902         if (rc == MDD_RN_SAME) {
1903                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
1904                                           sname, MOR_SRC_PARENT);
1905                 /* check hashes to determine do we need one lock or two */
1906                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1907                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
1908                                 MOR_TGT_PARENT);
1909                 else
1910                         tdlh = sdlh;
1911         } else if (rc == MDD_RN_SRCTGT) {
1912                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
1913                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
1914         } else {
1915                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
1916                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
1917         }
1918         if (sdlh == NULL || tdlh == NULL)
1919                 GOTO(cleanup, rc = -ENOMEM);
1920
1921         mdd_sobj = mdd_object_find(env, mdd, lf);
1922         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1923                                      mdd_sobj, mdd_tobj, ma);
1924         if (rc)
1925                 GOTO(cleanup, rc);
1926
1927         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1928                                 mdd_object_capa(env, mdd_spobj));
1929         if (rc)
1930                 GOTO(cleanup, rc);
1931
1932         /* "mv dir1 dir2" needs "dir1/.." link update */
1933         if (is_dir) {
1934                 rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
1935                                         mdd_object_capa(env, mdd_spobj));
1936                 if (rc)
1937                        GOTO(cleanup, rc);
1938
1939                 rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
1940                                         is_dir, handle,
1941                                         mdd_object_capa(env, mdd_tpobj));
1942                 if (rc)
1943                         GOTO(cleanup, rc);
1944         }
1945
1946         /*
1947          * Here tobj can be remote one, so we do index_delete unconditionally
1948          * and -ENOENT is allowed.
1949          */
1950         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1951                                 mdd_object_capa(env, mdd_tpobj));
1952         if (rc != 0 && rc != -ENOENT)
1953                 GOTO(cleanup, rc);
1954
1955         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1956                                 mdd_object_capa(env, mdd_tpobj));
1957         if (rc)
1958                 GOTO(cleanup, rc);
1959
1960         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1961         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1962
1963         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
1964         if (mdd_sobj) {
1965                 la->la_valid = LA_CTIME;
1966                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
1967                                                         handle, 0);
1968                 if (rc)
1969                         GOTO(cleanup, rc);
1970         }
1971
1972         /*
1973          * For tobj is remote case cmm layer has processed
1974          * and set tobj to NULL then. So when tobj is NOT NULL,
1975          * it must be local one.
1976          */
1977         if (tobj && mdd_object_exists(mdd_tobj)) {
1978                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1979                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1980
1981                 /* Remove dot reference. */
1982                 if (is_dir)
1983                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1984
1985                 la->la_valid = LA_CTIME;
1986                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1987                 if (rc)
1988                         GOTO(cleanup, rc);
1989
1990                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1991                 mdd_write_unlock(env, mdd_tobj);
1992                 if (rc)
1993                         GOTO(cleanup, rc);
1994
1995 #ifdef HAVE_QUOTA_SUPPORT
1996                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1997                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1998                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1999                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
2000                 }
2001 #endif
2002         }
2003
2004         la->la_valid = LA_CTIME | LA_MTIME;
2005         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
2006         if (rc)
2007                 GOTO(cleanup, rc);
2008
2009         if (mdd_spobj != mdd_tpobj) {
2010                 la->la_valid = LA_CTIME | LA_MTIME;
2011                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
2012                                                   handle, 0);
2013         }
2014
2015         EXIT;
2016 cleanup:
2017         if (likely(tdlh) && sdlh != tdlh)
2018                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2019         if (likely(sdlh))
2020                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2021 cleanup_unlocked:
2022         mdd_trans_stop(env, mdd, rc, handle);
2023         if (mdd_sobj)
2024                 mdd_object_put(env, mdd_sobj);
2025 out_pending:
2026 #ifdef HAVE_QUOTA_SUPPORT
2027         if (mds->mds_quota) {
2028                 if (rec_pending)
2029                         lquota_pending_commit(mds_quota_interface_ref, obd,
2030                                               qtpids[USRQUOTA],
2031                                               qtpids[GRPQUOTA],
2032                                               1, 1);
2033                 /* Trigger dqrel on the source owner of parent.
2034                  * If failed, the next call for lquota_chkquota will
2035                  * process it. */
2036                 lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2037                               FSFILT_OP_UNLINK_PARTIAL_PARENT);
2038                 if (quota_opc)
2039                         /* Trigger dqrel/dqacq on the target owner of child and
2040                          * parent. If failed, the next call for lquota_chkquota
2041                          * will process it. */
2042                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2043                                       qtpids, rc, quota_opc);
2044         }
2045 #endif
2046         return rc;
2047 }
2048
2049 const struct md_dir_operations mdd_dir_ops = {
2050         .mdo_is_subdir     = mdd_is_subdir,
2051         .mdo_lookup        = mdd_lookup,
2052         .mdo_create        = mdd_create,
2053         .mdo_rename        = mdd_rename,
2054         .mdo_link          = mdd_link,
2055         .mdo_unlink        = mdd_unlink,
2056         .mdo_name_insert   = mdd_name_insert,
2057         .mdo_name_remove   = mdd_name_remove,
2058         .mdo_rename_tgt    = mdd_rename_tgt,
2059         .mdo_create_data   = mdd_create_data
2060 };