Whamcloud - gitweb
b=19188
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int mdd_links_add(const struct lu_env *env,
75                          struct mdd_object *mdd_obj,
76                          const struct lu_fid *pfid,
77                          const struct lu_name *lname,
78                          struct thandle *handle);
79 static int mdd_links_rename(const struct lu_env *env,
80                             struct mdd_object *mdd_obj,
81                             const struct lu_fid *oldpfid,
82                             const struct lu_name *oldlname,
83                             const struct lu_fid *newpfid,
84                             const struct lu_name *newlname,
85                             struct thandle *handle);
86
87 static int
88 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
89                     const struct lu_name *lname, struct lu_fid* fid, int mask)
90 {
91         const char *name = lname->ln_name;
92         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
93         struct dynlock_handle *dlh;
94         int rc;
95
96         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
97         if (unlikely(dlh == NULL))
98                 return -ENOMEM;
99         rc = __mdd_lookup(env, pobj, lname, fid, mask);
100         mdd_pdo_read_unlock(env, mdd_obj, dlh);
101
102         return rc;
103 }
104
105 int mdd_lookup(const struct lu_env *env,
106                struct md_object *pobj, const struct lu_name *lname,
107                struct lu_fid* fid, struct md_op_spec *spec)
108 {
109         int rc;
110         ENTRY;
111         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
112         RETURN(rc);
113 }
114
115 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
116                           struct lu_fid *fid)
117 {
118         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
119 }
120
121 /*
122  * For root fid use special function, which does not compare version component
123  * of fid. Version component is different for root fids on all MDTs.
124  */
125 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
126 {
127         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
128                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
129 }
130
131 /*
132  * return 1: if lf is the fid of the ancestor of p1;
133  * return 0: if not;
134  *
135  * return -EREMOTE: if remote object is found, in this
136  * case fid of remote object is saved to @pf;
137  *
138  * otherwise: values < 0, errors.
139  */
140 static int mdd_is_parent(const struct lu_env *env,
141                          struct mdd_device *mdd,
142                          struct mdd_object *p1,
143                          const struct lu_fid *lf,
144                          struct lu_fid *pf)
145 {
146         struct mdd_object *parent = NULL;
147         struct lu_fid *pfid;
148         int rc;
149         ENTRY;
150
151         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
152         pfid = &mdd_env_info(env)->mti_fid;
153
154         /* Check for root first. */
155         if (mdd_is_root(mdd, mdo2fid(p1)))
156                 RETURN(0);
157
158         for(;;) {
159                 /* this is done recursively, bypass capa for each obj */
160                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
161                 rc = mdd_parent_fid(env, p1, pfid);
162                 if (rc)
163                         GOTO(out, rc);
164                 if (mdd_is_root(mdd, pfid))
165                         GOTO(out, rc = 0);
166                 if (lu_fid_eq(pfid, lf))
167                         GOTO(out, rc = 1);
168                 if (parent)
169                         mdd_object_put(env, parent);
170                 parent = mdd_object_find(env, mdd, pfid);
171
172                 /* cross-ref parent */
173                 if (parent == NULL) {
174                         if (pf != NULL)
175                                 *pf = *pfid;
176                         GOTO(out, rc = -EREMOTE);
177                 } else if (IS_ERR(parent))
178                         GOTO(out, rc = PTR_ERR(parent));
179                 p1 = parent;
180         }
181         EXIT;
182 out:
183         if (parent && !IS_ERR(parent))
184                 mdd_object_put(env, parent);
185         return rc;
186 }
187
188 /*
189  * No permission check is needed.
190  *
191  * returns 1: if fid is ancestor of @mo;
192  * returns 0: if fid is not a ancestor of @mo;
193  *
194  * returns EREMOTE if remote object is found, fid of remote object is saved to
195  * @fid;
196  *
197  * returns < 0: if error
198  */
199 static int mdd_is_subdir(const struct lu_env *env,
200                          struct md_object *mo, const struct lu_fid *fid,
201                          struct lu_fid *sfid)
202 {
203         struct mdd_device *mdd = mdo2mdd(mo);
204         int rc;
205         ENTRY;
206
207         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
208                 RETURN(0);
209
210         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
211         if (rc == 0) {
212                 /* found root */
213                 fid_zero(sfid);
214         } else if (rc == 1) {
215                 /* found @fid is parent */
216                 *sfid = *fid;
217                 rc = 0;
218         }
219         RETURN(rc);
220 }
221
222 /*
223  * Check that @dir contains no entries except (possibly) dot and dotdot.
224  *
225  * Returns:
226  *
227  *             0        empty
228  *      -ENOTDIR        not a directory object
229  *    -ENOTEMPTY        not empty
230  *           -ve        other error
231  *
232  */
233 static int mdd_dir_is_empty(const struct lu_env *env,
234                             struct mdd_object *dir)
235 {
236         struct dt_it     *it;
237         struct dt_object *obj;
238         const struct dt_it_ops *iops;
239         int result;
240         ENTRY;
241
242         obj = mdd_object_child(dir);
243         if (!dt_try_as_dir(env, obj))
244                 RETURN(-ENOTDIR);
245
246         iops = &obj->do_index_ops->dio_it;
247         it = iops->init(env, obj, BYPASS_CAPA);
248         if (it != NULL) {
249                 result = iops->get(env, it, (const void *)"");
250                 if (result > 0) {
251                         int i;
252                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
253                                 result = iops->next(env, it);
254                         if (result == 0)
255                                 result = -ENOTEMPTY;
256                         else if (result == +1)
257                                 result = 0;
258                 } else if (result == 0)
259                         /*
260                          * Huh? Index contains no zero key?
261                          */
262                         result = -EIO;
263
264                 iops->put(env, it);
265                 iops->fini(env, it);
266         } else
267                 result = -ENOMEM;
268         RETURN(result);
269 }
270
271 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
272 {
273         struct mdd_device *m = mdd_obj2mdd_dev(obj);
274         struct lu_attr *la = &mdd_env_info(env)->mti_la;
275         int rc;
276         ENTRY;
277
278         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
279         if (rc)
280                 RETURN(rc);
281
282         /*
283          * Subdir count limitation can be broken through.
284          */
285         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
286             !S_ISDIR(la->la_mode))
287                 RETURN(-EMLINK);
288         else
289                 RETURN(0);
290 }
291
292 /*
293  * Check whether it may create the cobj under the pobj.
294  * cobj maybe NULL
295  */
296 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
297                    struct mdd_object *cobj, int check_perm, int check_nlink)
298 {
299         int rc = 0;
300         ENTRY;
301
302         if (cobj && mdd_object_exists(cobj))
303                 RETURN(-EEXIST);
304
305         if (mdd_is_dead_obj(pobj))
306                 RETURN(-ENOENT);
307
308         if (check_perm)
309                 rc = mdd_permission_internal_locked(env, pobj, NULL,
310                                                     MAY_WRITE | MAY_EXEC,
311                                                     MOR_TGT_PARENT);
312
313         if (!rc && check_nlink)
314                 rc = __mdd_may_link(env, pobj);
315
316         RETURN(rc);
317 }
318
319 /*
320  * Check whether can unlink from the pobj in the case of "cobj == NULL".
321  */
322 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
323                    const struct md_attr *ma)
324 {
325         int rc;
326         ENTRY;
327
328         if (mdd_is_dead_obj(pobj))
329                 RETURN(-ENOENT);
330
331         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
332             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
333                 RETURN(-EPERM);
334
335         rc = mdd_permission_internal_locked(env, pobj, NULL,
336                                             MAY_WRITE | MAY_EXEC,
337                                             MOR_TGT_PARENT);
338         if (rc)
339                 RETURN(rc);
340
341         if (mdd_is_append(pobj))
342                 RETURN(-EPERM);
343
344         RETURN(rc);
345 }
346
347 /*
348  * pobj == NULL is remote ops case, under such case, pobj's
349  * VTX feature has been checked already, no need check again.
350  */
351 static inline int mdd_is_sticky(const struct lu_env *env,
352                                 struct mdd_object *pobj,
353                                 struct mdd_object *cobj)
354 {
355         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
356         struct md_ucred *uc = md_ucred(env);
357         int rc;
358
359         if (pobj) {
360                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
361                 if (rc)
362                         return rc;
363
364                 if (!(tmp_la->la_mode & S_ISVTX) ||
365                      (tmp_la->la_uid == uc->mu_fsuid))
366                         return 0;
367         }
368
369         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
370         if (rc)
371                 return rc;
372
373         if (tmp_la->la_uid == uc->mu_fsuid)
374                 return 0;
375
376         return !mdd_capable(uc, CFS_CAP_FOWNER);
377 }
378
379 /*
380  * Check whether it may delete the cobj from the pobj.
381  * pobj maybe NULL
382  */
383 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
384                    struct mdd_object *cobj, struct md_attr *ma,
385                    int check_perm, int check_empty)
386 {
387         int rc = 0;
388         ENTRY;
389
390         LASSERT(cobj);
391         if (!mdd_object_exists(cobj))
392                 RETURN(-ENOENT);
393
394         if (mdd_is_dead_obj(cobj))
395                 RETURN(-ESTALE);
396
397         if (pobj) {
398                 if (!mdd_object_exists(pobj))
399                         RETURN(-ENOENT);
400
401                 if (mdd_is_dead_obj(pobj))
402                         RETURN(-ENOENT);
403
404                 if (check_perm) {
405                         rc = mdd_permission_internal_locked(env, pobj, NULL,
406                                                     MAY_WRITE | MAY_EXEC,
407                                                     MOR_TGT_PARENT);
408                         if (rc)
409                                 RETURN(rc);
410                 }
411
412                 if (mdd_is_append(pobj))
413                         RETURN(-EPERM);
414         }
415
416         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
417             mdd_is_sticky(env, pobj, cobj))
418                 RETURN(-EPERM);
419
420         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
421                 RETURN(-EPERM);
422
423         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
424             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
425                 RETURN(-EPERM);
426
427         if (S_ISDIR(ma->ma_attr.la_mode)) {
428                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
429
430                 if (!S_ISDIR(mdd_object_type(cobj)))
431                         RETURN(-ENOTDIR);
432
433                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
434                         RETURN(-EBUSY);
435         } else if (S_ISDIR(mdd_object_type(cobj)))
436                 RETURN(-EISDIR);
437
438         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
439                 rc = mdd_dir_is_empty(env, cobj);
440
441         RETURN(rc);
442 }
443
444 /*
445  * tgt maybe NULL
446  * has mdd_write_lock on src already, but not on tgt yet
447  */
448 int mdd_link_sanity_check(const struct lu_env *env,
449                           struct mdd_object *tgt_obj,
450                           const struct lu_name *lname,
451                           struct mdd_object *src_obj)
452 {
453         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
454         int rc = 0;
455         ENTRY;
456
457         if (!mdd_object_exists(src_obj))
458                 RETURN(-ENOENT);
459
460         if (mdd_is_dead_obj(src_obj))
461                 RETURN(-ESTALE);
462
463         /* Local ops, no lookup before link, check filename length here. */
464         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
465                 RETURN(-ENAMETOOLONG);
466
467         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
468                 RETURN(-EPERM);
469
470         if (S_ISDIR(mdd_object_type(src_obj)))
471                 RETURN(-EPERM);
472
473         LASSERT(src_obj != tgt_obj);
474         if (tgt_obj) {
475                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
476                 if (rc)
477                         RETURN(rc);
478         }
479
480         rc = __mdd_may_link(env, src_obj);
481
482         RETURN(rc);
483 }
484
485 /**
486  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
487  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
488  * (maybe too large to be represented). It is a trick to break through the
489  * "i_nlink" limitation for subdir count.
490  */
491 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
492                    struct thandle *handle)
493 {
494         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
495         struct mdd_device *m = mdd_obj2mdd_dev(obj);
496
497         if (!mdd_is_mnlink(obj)) {
498                 if (S_ISDIR(mdd_object_type(obj))) {
499                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
500                                 return;
501
502                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
503                                 obj->mod_flags |= MNLINK_OBJ;
504                                 tmp_la->la_nlink = 1;
505                                 tmp_la->la_valid = LA_NLINK;
506                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
507                                                       0);
508                                 return;
509                         }
510                 }
511                 mdo_ref_add(env, obj, handle);
512         }
513 }
514
515 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
516                    struct thandle *handle, int is_dot)
517 {
518         if (!mdd_is_mnlink(obj) || is_dot)
519                 mdo_ref_del(env, obj, handle);
520 }
521
522 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
523                                    const char *name, struct thandle *handle,
524                                    struct lustre_capa *capa)
525 {
526         struct dt_object *next = mdd_object_child(pobj);
527         int               rc;
528         ENTRY;
529
530         if (dt_try_as_dir(env, next)) {
531                 rc = next->do_index_ops->dio_delete(env, next,
532                                                     (struct dt_key *)name,
533                                                     handle, capa);
534         } else
535                 rc = -ENOTDIR;
536
537         RETURN(rc);
538 }
539
540 static int __mdd_index_insert_only(const struct lu_env *env,
541                                    struct mdd_object *pobj,
542                                    const struct lu_fid *lf, const char *name,
543                                    struct thandle *handle,
544                                    struct lustre_capa *capa)
545 {
546         struct dt_object *next = mdd_object_child(pobj);
547         int               rc;
548         ENTRY;
549
550         if (dt_try_as_dir(env, next)) {
551                 struct md_ucred  *uc = md_ucred(env);
552
553                 rc = next->do_index_ops->dio_insert(env, next,
554                                                     __mdd_fid_rec(env, lf),
555                                                     (const struct dt_key *)name,
556                                                     handle, capa, uc->mu_cap &
557                                                     CFS_CAP_SYS_RESOURCE_MASK);
558         } else {
559                 rc = -ENOTDIR;
560         }
561         RETURN(rc);
562 }
563
564 /* insert named index, add reference if isdir */
565 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
566                               const struct lu_fid *lf, const char *name, int is_dir,
567                               struct thandle *handle, struct lustre_capa *capa)
568 {
569         int               rc;
570         ENTRY;
571
572         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
573         if (rc == 0 && is_dir) {
574                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
575                 __mdd_ref_add(env, pobj, handle);
576                 mdd_write_unlock(env, pobj);
577         }
578         RETURN(rc);
579 }
580
581 /* delete named index, drop reference if isdir */
582 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
583                               const char *name, int is_dir, struct thandle *handle,
584                               struct lustre_capa *capa)
585 {
586         int               rc;
587         ENTRY;
588
589         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
590         if (rc == 0 && is_dir) {
591                 int is_dot = 0;
592
593                 if (name != NULL && name[0] == '.' && name[1] == 0)
594                         is_dot = 1;
595                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
596                 __mdd_ref_del(env, pobj, handle, is_dot);
597                 mdd_write_unlock(env, pobj);
598         }
599
600         RETURN(rc);
601 }
602
603
604 /** Store a namespace change changelog record
605  * If this fails, we must fail the whole transaction; we don't
606  * want the change to commit without the log entry.
607  * \param target - mdd_object of change
608  * \param parent - parent dir/object
609  * \param tf - target lu_fid, overrides fid of \a target if this is non-null
610  * \param tname - target name string
611  * \param handle - transacion handle
612  */
613 static int mdd_changelog_ns_store(const struct lu_env  *env,
614                                   struct mdd_device    *mdd,
615                                   enum changelog_rec_type type,
616                                   struct mdd_object    *target,
617                                   struct mdd_object    *parent,
618                                   const struct lu_fid  *tf,
619                                   const struct lu_name *tname,
620                                   struct thandle *handle)
621 {
622         const struct lu_fid *tfid;
623         const struct lu_fid *tpfid = mdo2fid(parent);
624         struct llog_changelog_rec *rec;
625         struct lu_buf *buf;
626         int reclen;
627         int rc;
628         ENTRY;
629
630         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
631                 RETURN(0);
632
633         LASSERT(parent != NULL);
634         LASSERT(tname != NULL);
635         LASSERT(handle != NULL);
636
637         /* target */
638         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
639         buf = mdd_buf_alloc(env, reclen);
640         if (buf->lb_buf == NULL)
641                 RETURN(-ENOMEM);
642         rec = (struct llog_changelog_rec *)buf->lb_buf;
643
644         rec->cr_flags = CLF_VERSION;
645         rec->cr_type = (__u32)type;
646         tfid = tf ? tf : mdo2fid(target);
647         rec->cr_tfid = *tfid;
648         rec->cr_pfid = *tpfid;
649         rec->cr_namelen = tname->ln_namelen;
650         memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen);
651         if (likely(target))
652                 target->mod_cltime = cfs_time_current_64();
653
654         rc = mdd_changelog_llog_write(mdd, rec, handle);
655         if (rc < 0) {
656                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
657                        rc, type, tname->ln_name, PFID(tfid), PFID(tpfid));
658                 return -EFAULT;
659         }
660
661         return 0;
662 }
663
664 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
665                     struct md_object *src_obj, const struct lu_name *lname,
666                     struct md_attr *ma)
667 {
668         const char *name = lname->ln_name;
669         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
670         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
671         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
672         struct mdd_device *mdd = mdo2mdd(src_obj);
673         struct dynlock_handle *dlh;
674         struct thandle *handle;
675 #ifdef HAVE_QUOTA_SUPPORT
676         struct obd_device *obd = mdd->mdd_obd_dev;
677         struct mds_obd *mds = &obd->u.mds;
678         unsigned int qids[MAXQUOTAS] = { 0, 0 };
679         int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
680 #endif
681         int rc;
682         ENTRY;
683
684 #ifdef HAVE_QUOTA_SUPPORT
685         if (mds->mds_quota) {
686                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
687
688                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
689                 if (!rc) {
690                         void *data = NULL;
691                         mdd_data_get(env, mdd_tobj, &data);
692                         quota_opc = FSFILT_OP_LINK;
693                         mdd_quota_wrapper(la_tmp, qids);
694                         /* get block quota for parent */
695                         lquota_chkquota(mds_quota_interface_ref, obd,
696                                         qids, rec_pending, 1, NULL,
697                                         LQUOTA_FLAGS_BLK, data, 1);
698                 }
699         }
700 #endif
701
702         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
703         handle = mdd_trans_start(env, mdd);
704         if (IS_ERR(handle))
705                 GOTO(out_pending, rc = PTR_ERR(handle));
706
707         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
708         if (dlh == NULL)
709                 GOTO(out_trans, rc = -ENOMEM);
710         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
711
712         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
713         if (rc)
714                 GOTO(out_unlock, rc);
715
716         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
717                                      name, handle,
718                                      mdd_object_capa(env, mdd_tobj));
719         if (rc)
720                 GOTO(out_unlock, rc);
721
722         __mdd_ref_add(env, mdd_sobj, handle);
723
724         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
725         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
726
727         la->la_valid = LA_CTIME | LA_MTIME;
728         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
729         if (rc)
730                 GOTO(out_unlock, rc);
731
732         la->la_valid = LA_CTIME;
733         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
734         if (rc == 0)
735                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle);
736
737         EXIT;
738 out_unlock:
739         mdd_write_unlock(env, mdd_sobj);
740         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
741 out_trans:
742         if (rc == 0)
743                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj,
744                                             mdd_tobj, NULL, lname, handle);
745         mdd_trans_stop(env, mdd, rc, handle);
746 out_pending:
747 #ifdef HAVE_QUOTA_SUPPORT
748         if (quota_opc) {
749                 lquota_pending_commit(mds_quota_interface_ref, obd,
750                                       qids, rec_pending, 1);
751                 /* Trigger dqacq for the parent owner. If failed,
752                  * the next call for lquota_chkquota will process it. */
753                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
754                               quota_opc);
755         }
756 #endif
757         return rc;
758 }
759
760 /* caller should take a lock before calling */
761 int mdd_finish_unlink(const struct lu_env *env,
762                       struct mdd_object *obj, struct md_attr *ma,
763                       struct thandle *th)
764 {
765         int rc;
766         int reset = 1;
767         ENTRY;
768
769         rc = mdd_iattr_get(env, obj, ma);
770         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
771                 /* add new orphan and the object
772                  * will be deleted during mdd_close() */
773                 if (obj->mod_count) {
774                         rc = __mdd_orphan_add(env, obj, th);
775                         if (rc == 0) {
776                                 obj->mod_flags |= ORPHAN_OBJ;
777                                 CDEBUG(D_HA, "Object "DFID" is going to be "
778                                         "an orphan, open count = %d\n",
779                                         PFID(mdd_object_fid(obj)),
780                                         obj->mod_count);
781                         }
782                 }
783
784                 obj->mod_flags |= DEAD_OBJ;
785                 if (!(obj->mod_flags & ORPHAN_OBJ)) {
786                         rc = mdd_object_kill(env, obj, ma);
787                         if (rc == 0)
788                                 reset = 0;
789                 }
790
791         }
792         if (reset)
793                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
794
795         RETURN(rc);
796 }
797
798 /*
799  * pobj maybe NULL
800  * has mdd_write_lock on cobj already, but not on pobj yet
801  */
802 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
803                             struct mdd_object *cobj, struct md_attr *ma)
804 {
805         int rc;
806         ENTRY;
807
808         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
809
810         RETURN(rc);
811 }
812
813 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
814                       struct md_object *cobj, const struct lu_name *lname,
815                       struct md_attr *ma)
816 {
817         const char *name = lname->ln_name;
818         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
819         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
820         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
821         struct mdd_device *mdd = mdo2mdd(pobj);
822         struct dynlock_handle *dlh;
823         struct thandle    *handle;
824 #ifdef HAVE_QUOTA_SUPPORT
825         struct obd_device *obd = mdd->mdd_obd_dev;
826         struct mds_obd *mds = &obd->u.mds;
827         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
828         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
829         int quota_opc = 0;
830 #endif
831         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
832         int rc;
833         ENTRY;
834
835         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
836                  PFID(mdd_object_fid(mdd_cobj)));
837
838         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
839         if (rc)
840                 RETURN(rc);
841
842         handle = mdd_trans_start(env, mdd);
843         if (IS_ERR(handle))
844                 RETURN(PTR_ERR(handle));
845
846         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
847         if (dlh == NULL)
848                 GOTO(out_trans, rc = -ENOMEM);
849         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
850
851         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
852         if (rc)
853                 GOTO(cleanup, rc);
854
855         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
856                                 mdd_object_capa(env, mdd_pobj));
857         if (rc)
858                 GOTO(cleanup, rc);
859
860         __mdd_ref_del(env, mdd_cobj, handle, 0);
861         if (is_dir)
862                 /* unlink dot */
863                 __mdd_ref_del(env, mdd_cobj, handle, 1);
864
865         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
866         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
867
868         la->la_valid = LA_CTIME | LA_MTIME;
869         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
870         if (rc)
871                 GOTO(cleanup, rc);
872
873         la->la_valid = LA_CTIME;
874         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
875         if (rc)
876                 GOTO(cleanup, rc);
877
878         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
879 #ifdef HAVE_QUOTA_SUPPORT
880         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
881             ma->ma_attr.la_nlink == 0) {
882                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
883
884                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
885                 if (!rc) {
886                         mdd_quota_wrapper(la_tmp, qpids);
887                         if (mdd_cobj->mod_count == 0) {
888                                 quota_opc = FSFILT_OP_UNLINK;
889                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
890                         } else {
891                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
892                         }
893                 }
894         }
895 #endif
896
897         if (rc == 0)
898                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
899                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
900                                    NULL, NULL);
901         if (!is_dir)
902                 /* old files may not have link ea; ignore errors */
903                 mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj),
904                                  lname, NULL, NULL, handle);
905
906         EXIT;
907 cleanup:
908         mdd_write_unlock(env, mdd_cobj);
909         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
910 out_trans:
911         if (rc == 0)
912                 rc = mdd_changelog_ns_store(env, mdd,
913                                             is_dir ? CL_RMDIR : CL_UNLINK,
914                                             mdd_cobj, mdd_pobj, NULL, lname,
915                                             handle);
916
917         mdd_trans_stop(env, mdd, rc, handle);
918 #ifdef HAVE_QUOTA_SUPPORT
919         if (quota_opc)
920                 /* Trigger dqrel on the owner of child and parent. If failed,
921                  * the next call for lquota_chkquota will process it. */
922                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
923                               quota_opc);
924 #endif
925         return rc;
926 }
927
928 /* has not lock on pobj yet */
929 static int mdd_ni_sanity_check(const struct lu_env *env,
930                                struct md_object *pobj,
931                                const struct md_attr *ma)
932 {
933         struct mdd_object *obj = md2mdd_obj(pobj);
934         int rc;
935         ENTRY;
936
937         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
938                 RETURN(0);
939
940         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
941
942         RETURN(rc);
943 }
944
945 /*
946  * Partial operation.
947  */
948 static int mdd_name_insert(const struct lu_env *env,
949                            struct md_object *pobj,
950                            const struct lu_name *lname,
951                            const struct lu_fid *fid,
952                            const struct md_attr *ma)
953 {
954         const char *name = lname->ln_name;
955         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
956         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
957         struct mdd_device *mdd = mdo2mdd(pobj);
958         struct dynlock_handle *dlh;
959         struct thandle *handle;
960         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
961 #ifdef HAVE_QUOTA_SUPPORT
962         struct md_ucred *uc = md_ucred(env);
963         struct obd_device *obd = mdd->mdd_obd_dev;
964         struct mds_obd *mds = &obd->u.mds;
965         unsigned int qids[MAXQUOTAS] = { 0, 0 };
966         int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
967         cfs_cap_t save = uc->mu_cap;
968 #endif
969         int rc;
970         ENTRY;
971
972 #ifdef HAVE_QUOTA_SUPPORT
973         if (mds->mds_quota) {
974                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
975                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
976
977                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
978                         if (!rc) {
979                                 void *data = NULL;
980                                 mdd_data_get(env, mdd_obj, &data);
981                                 quota_opc = FSFILT_OP_LINK;
982                                 mdd_quota_wrapper(la_tmp, qids);
983                                 /* get block quota for parent */
984                                 lquota_chkquota(mds_quota_interface_ref, obd,
985                                                 qids, rec_pending, 1, NULL,
986                                                 LQUOTA_FLAGS_BLK, data, 1);
987                         }
988                 } else {
989                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
990                 }
991         }
992 #endif
993         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
994         handle = mdd_trans_start(env, mdo2mdd(pobj));
995         if (IS_ERR(handle))
996                 GOTO(out_pending, rc = PTR_ERR(handle));
997
998         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
999         if (dlh == NULL)
1000                 GOTO(out_trans, rc = -ENOMEM);
1001
1002         rc = mdd_ni_sanity_check(env, pobj, ma);
1003         if (rc)
1004                 GOTO(out_unlock, rc);
1005
1006         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
1007                                 handle, BYPASS_CAPA);
1008         if (rc)
1009                 GOTO(out_unlock, rc);
1010
1011         /*
1012          * For some case, no need update obj's ctime (LA_CTIME is not set),
1013          * e.g. split_dir.
1014          * For other cases, update obj's ctime (LA_CTIME is set),
1015          * e.g. cmr_link.
1016          */
1017         if (ma->ma_attr.la_valid & LA_CTIME) {
1018                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1019                 la->la_valid = LA_CTIME | LA_MTIME;
1020                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1021                                                         handle, 0);
1022         }
1023         EXIT;
1024 out_unlock:
1025         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1026 out_trans:
1027         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
1028 out_pending:
1029 #ifdef HAVE_QUOTA_SUPPORT
1030         if (mds->mds_quota) {
1031                 if (quota_opc) {
1032                         lquota_pending_commit(mds_quota_interface_ref,
1033                                               obd, qids, rec_pending, 1);
1034                         /* Trigger dqacq for the parent owner. If failed,
1035                          * the next call for lquota_chkquota will process it*/
1036                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
1037                                       rc, quota_opc);
1038                 } else {
1039                         uc->mu_cap = save;
1040                 }
1041         }
1042 #endif
1043         return rc;
1044 }
1045
1046 /* has not lock on pobj yet */
1047 static int mdd_nr_sanity_check(const struct lu_env *env,
1048                                struct md_object *pobj,
1049                                const struct md_attr *ma)
1050 {
1051         struct mdd_object *obj = md2mdd_obj(pobj);
1052         int rc;
1053         ENTRY;
1054
1055         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
1056                 RETURN(0);
1057
1058         rc = mdd_may_unlink(env, obj, ma);
1059
1060         RETURN(rc);
1061 }
1062
1063 /*
1064  * Partial operation.
1065  */
1066 static int mdd_name_remove(const struct lu_env *env,
1067                            struct md_object *pobj,
1068                            const struct lu_name *lname,
1069                            const struct md_attr *ma)
1070 {
1071         const char *name = lname->ln_name;
1072         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1073         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1074         struct mdd_device *mdd = mdo2mdd(pobj);
1075         struct dynlock_handle *dlh;
1076         struct thandle *handle;
1077         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1078 #ifdef HAVE_QUOTA_SUPPORT
1079         struct obd_device *obd = mdd->mdd_obd_dev;
1080         struct mds_obd *mds = &obd->u.mds;
1081         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1082         int quota_opc = 0;
1083 #endif
1084         int rc;
1085         ENTRY;
1086
1087 #ifdef HAVE_QUOTA_SUPPORT
1088         if (mds->mds_quota) {
1089                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1090
1091                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1092                 if (!rc) {
1093                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
1094                         mdd_quota_wrapper(la_tmp, qids);
1095                 }
1096         }
1097 #endif
1098         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
1099         handle = mdd_trans_start(env, mdd);
1100         if (IS_ERR(handle))
1101                 GOTO(out_pending, rc = PTR_ERR(handle));
1102
1103         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1104         if (dlh == NULL)
1105                 GOTO(out_trans, rc = -ENOMEM);
1106
1107         rc = mdd_nr_sanity_check(env, pobj, ma);
1108         if (rc)
1109                 GOTO(out_unlock, rc);
1110
1111         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1112                                 handle, BYPASS_CAPA);
1113         if (rc)
1114                 GOTO(out_unlock, rc);
1115
1116         /*
1117          * For some case, no need update obj's ctime (LA_CTIME is not set),
1118          * e.g. split_dir.
1119          * For other cases, update obj's ctime (LA_CTIME is set),
1120          * e.g. cmr_unlink.
1121          */
1122         if (ma->ma_attr.la_valid & LA_CTIME) {
1123                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1124                 la->la_valid = LA_CTIME | LA_MTIME;
1125                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1126                                                         handle, 0);
1127         }
1128         EXIT;
1129 out_unlock:
1130         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1131 out_trans:
1132         mdd_trans_stop(env, mdd, rc, handle);
1133 out_pending:
1134 #ifdef HAVE_QUOTA_SUPPORT
1135         /* Trigger dqrel for the parent owner.
1136          * If failed, the next call for lquota_chkquota will process it. */
1137         if (quota_opc)
1138                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1139                               quota_opc);
1140 #endif
1141         return rc;
1142 }
1143
1144 /*
1145  * tobj maybe NULL
1146  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1147  */
1148 static int mdd_rt_sanity_check(const struct lu_env *env,
1149                                struct mdd_object *tgt_pobj,
1150                                struct mdd_object *tobj,
1151                                struct md_attr *ma)
1152 {
1153         int rc;
1154         ENTRY;
1155
1156         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1157                 RETURN(0);
1158
1159         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1160          * exist. In fact, tobj must exist, otherwise the call trace will be:
1161          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1162          * When get here, tobj must be NOT NULL, the other case has been
1163          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1164          * MDS_PERM_BYPASS.
1165          * So check may_delete, but not check nlink of tgt_pobj. */
1166
1167         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1168
1169         RETURN(rc);
1170 }
1171
1172 /* Partial rename op on slave MDD */
1173 static int mdd_rename_tgt(const struct lu_env *env,
1174                           struct md_object *pobj, struct md_object *tobj,
1175                           const struct lu_fid *lf, const struct lu_name *lname,
1176                           struct md_attr *ma)
1177 {
1178         const char *name = lname->ln_name;
1179         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1180         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1181         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1182         struct mdd_device *mdd = mdo2mdd(pobj);
1183         struct dynlock_handle *dlh;
1184         struct thandle *handle;
1185 #ifdef HAVE_QUOTA_SUPPORT
1186         struct obd_device *obd = mdd->mdd_obd_dev;
1187         struct mds_obd *mds = &obd->u.mds;
1188         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1189         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1190         int quota_copc = 0, quota_popc = 0;
1191         int rec_pending[MAXQUOTAS] = { 0, 0 };
1192 #endif
1193         int rc;
1194         ENTRY;
1195
1196 #ifdef HAVE_QUOTA_SUPPORT
1197         if (mds->mds_quota && !tobj) {
1198                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1199
1200                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1201                 if (!rc) {
1202                         void *data = NULL;
1203                         mdd_data_get(env, mdd_tpobj, &data);
1204                         quota_popc = FSFILT_OP_LINK;
1205                         mdd_quota_wrapper(la_tmp, qpids);
1206                         /* get block quota for target parent */
1207                         lquota_chkquota(mds_quota_interface_ref, obd,
1208                                         qpids, rec_pending, 1, NULL,
1209                                         LQUOTA_FLAGS_BLK, data, 1);
1210                 }
1211         }
1212 #endif
1213         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
1214         handle = mdd_trans_start(env, mdd);
1215         if (IS_ERR(handle))
1216                 GOTO(out_pending, rc = PTR_ERR(handle));
1217
1218         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1219         if (dlh == NULL)
1220                 GOTO(out_trans, rc = -ENOMEM);
1221         if (tobj)
1222                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1223
1224         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1225         if (rc)
1226                 GOTO(cleanup, rc);
1227
1228         /*
1229          * If rename_tgt is called then we should just re-insert name with
1230          * correct fid, no need to dec/inc parent nlink if obj is dir.
1231          */
1232         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1233         if (rc)
1234                 GOTO(cleanup, rc);
1235
1236         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1237                                      BYPASS_CAPA);
1238         if (rc)
1239                 GOTO(cleanup, rc);
1240
1241         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1242         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1243
1244         la->la_valid = LA_CTIME | LA_MTIME;
1245         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1246         if (rc)
1247                 GOTO(cleanup, rc);
1248
1249         /*
1250          * For tobj is remote case cmm layer has processed
1251          * and pass NULL tobj to here. So when tobj is NOT NULL,
1252          * it must be local one.
1253          */
1254         if (tobj && mdd_object_exists(mdd_tobj)) {
1255                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1256
1257                 /* Remove dot reference. */
1258                 if (S_ISDIR(ma->ma_attr.la_mode))
1259                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1260
1261                 la->la_valid = LA_CTIME;
1262                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1263                 if (rc)
1264                         GOTO(cleanup, rc);
1265
1266                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1267                 if (rc)
1268                         GOTO(cleanup, rc);
1269
1270 #ifdef HAVE_QUOTA_SUPPORT
1271                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1272                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1273                         quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1274                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1275                 }
1276 #endif
1277         }
1278         EXIT;
1279 cleanup:
1280         if (tobj)
1281                 mdd_write_unlock(env, mdd_tobj);
1282         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1283 out_trans:
1284         if (rc == 0)
1285                 /* Bare EXT record with no RENAME in front of it signifies
1286                    a partial slave op */
1287                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
1288                                             mdd_tpobj, NULL, lname, handle);
1289
1290         mdd_trans_stop(env, mdd, rc, handle);
1291 out_pending:
1292 #ifdef HAVE_QUOTA_SUPPORT
1293         if (mds->mds_quota) {
1294                 if (quota_popc)
1295                         lquota_pending_commit(mds_quota_interface_ref, obd,
1296                                               qpids, rec_pending, 1);
1297
1298                 if (quota_copc)
1299                         /* Trigger dqrel on the target owner of child.
1300                          * If failed, the next call for lquota_chkquota
1301                          * will process it. */
1302                         lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids,
1303                                       rc, quota_copc);
1304         }
1305 #endif
1306         return rc;
1307 }
1308
1309 /*
1310  * The permission has been checked when obj created, no need check again.
1311  */
1312 static int mdd_cd_sanity_check(const struct lu_env *env,
1313                                struct mdd_object *obj)
1314 {
1315         ENTRY;
1316
1317         /* EEXIST check */
1318         if (!obj || mdd_is_dead_obj(obj))
1319                 RETURN(-ENOENT);
1320
1321         RETURN(0);
1322
1323 }
1324
1325 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1326                            struct md_object *cobj, const struct md_op_spec *spec,
1327                            struct md_attr *ma)
1328 {
1329         struct mdd_device *mdd = mdo2mdd(cobj);
1330         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1331         struct mdd_object *son = md2mdd_obj(cobj);
1332         struct lu_attr    *attr = &ma->ma_attr;
1333         struct lov_mds_md *lmm = NULL;
1334         int                lmm_size = 0;
1335         struct thandle    *handle;
1336         int                rc;
1337         ENTRY;
1338
1339         rc = mdd_cd_sanity_check(env, son);
1340         if (rc)
1341                 RETURN(rc);
1342
1343         if (!md_should_create(spec->sp_cr_flags))
1344                 RETURN(0);
1345         lmm_size = ma->ma_lmm_size;
1346         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1347                             spec, attr);
1348         if (rc)
1349                 RETURN(rc);
1350
1351         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1352         handle = mdd_trans_start(env, mdd);
1353         if (IS_ERR(handle))
1354                 GOTO(out_free, rc = PTR_ERR(handle));
1355
1356         /*
1357          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1358          * Should this be fixed?
1359          */
1360
1361         /* Replay creates has objects already */
1362 #if 0
1363         if (spec->no_create) {
1364                 CDEBUG(D_INFO, "we already have lov ea\n");
1365                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1366                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1367                                     spec->u.sp_ea.eadatalen, handle, 0);
1368         } else
1369 #endif
1370                 /* No need mdd_lsm_sanity_check here */
1371                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1372                                     lmm_size, handle, 0);
1373
1374         if (rc == 0)
1375                rc = mdd_attr_get_internal_locked(env, son, ma);
1376
1377         /* update lov_objid data, must be before transaction stop! */
1378         if (rc == 0)
1379                 mdd_lov_objid_update(mdd, lmm);
1380
1381         mdd_trans_stop(env, mdd, rc, handle);
1382 out_free:
1383         /* Finish mdd_lov_create() stuff. */
1384         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1385         RETURN(rc);
1386 }
1387
1388 /* Get fid from name and parent */
1389 static int
1390 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1391              const struct lu_name *lname, struct lu_fid* fid, int mask)
1392 {
1393         const char          *name = lname->ln_name;
1394         const struct dt_key *key = (const struct dt_key *)name;
1395         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1396         struct mdd_device   *m = mdo2mdd(pobj);
1397         struct dt_object    *dir = mdd_object_child(mdd_obj);
1398         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1399         int rc;
1400         ENTRY;
1401
1402         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1403                 RETURN(-ESTALE);
1404
1405         rc = mdd_object_exists(mdd_obj);
1406         if (unlikely(rc == 0))
1407                 RETURN(-ESTALE);
1408         else if (unlikely(rc < 0)) {
1409                 CERROR("Object "DFID" locates on remote server\n",
1410                         PFID(mdo2fid(mdd_obj)));
1411                 LBUG();
1412         }
1413
1414         /* The common filename length check. */
1415         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1416                 RETURN(-ENAMETOOLONG);
1417
1418         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1419                                             MOR_TGT_PARENT);
1420         if (rc)
1421                 RETURN(rc);
1422
1423         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1424                    dt_try_as_dir(env, dir))) {
1425
1426                 rc = dir->do_index_ops->dio_lookup(env, dir,
1427                                                  (struct dt_rec *)pack, key,
1428                                                  mdd_object_capa(env, mdd_obj));
1429                 if (rc > 0)
1430                         rc = fid_unpack(pack, fid);
1431                 else if (rc == 0)
1432                         rc = -ENOENT;
1433         } else
1434                 rc = -ENOTDIR;
1435
1436         RETURN(rc);
1437 }
1438
1439 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1440                           const struct lu_name *lname, struct mdd_object *child,
1441                           struct md_attr *ma, struct thandle *handle,
1442                           const struct md_op_spec *spec)
1443 {
1444         int rc;
1445         ENTRY;
1446
1447         /*
1448          * Update attributes for child.
1449          *
1450          * FIXME:
1451          *  (1) the valid bits should be converted between Lustre and Linux;
1452          *  (2) maybe, the child attributes should be set in OSD when creation.
1453          */
1454
1455         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1456         if (rc != 0)
1457                 RETURN(rc);
1458
1459         if (S_ISDIR(ma->ma_attr.la_mode)) {
1460                 /* Add "." and ".." for newly created dir */
1461                 __mdd_ref_add(env, child, handle);
1462                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1463                                              dot, handle, BYPASS_CAPA);
1464                 if (rc == 0) {
1465                         rc = __mdd_index_insert_only(env, child, pfid,
1466                                                      dotdot, handle,
1467                                                      BYPASS_CAPA);
1468                         if (rc != 0) {
1469                                 int rc2;
1470
1471                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1472                                                          handle, BYPASS_CAPA);
1473                                 if (rc2 != 0)
1474                                         CERROR("Failure to cleanup after dotdot"
1475                                                " creation: %d (%d)\n", rc2, rc);
1476                         }
1477                 }
1478         }
1479         if (rc == 0)
1480                 mdd_links_add(env, child, pfid, lname, handle);
1481
1482         RETURN(rc);
1483 }
1484
1485 /* has not lock on pobj yet */
1486 static int mdd_create_sanity_check(const struct lu_env *env,
1487                                    struct md_object *pobj,
1488                                    const struct lu_name *lname,
1489                                    struct md_attr *ma,
1490                                    struct md_op_spec *spec)
1491 {
1492         struct mdd_thread_info *info = mdd_env_info(env);
1493         struct lu_attr    *la        = &info->mti_la;
1494         struct lu_fid     *fid       = &info->mti_fid;
1495         struct mdd_object *obj       = md2mdd_obj(pobj);
1496         struct mdd_device *m         = mdo2mdd(pobj);
1497         int lookup                   = spec->sp_cr_lookup;
1498         int rc;
1499         ENTRY;
1500
1501         /* EEXIST check */
1502         if (mdd_is_dead_obj(obj))
1503                 RETURN(-ENOENT);
1504
1505         /*
1506          * In some cases this lookup is not needed - we know before if name
1507          * exists or not because MDT performs lookup for it.
1508          * name length check is done in lookup.
1509          */
1510         if (lookup) {
1511                 /*
1512                  * Check if the name already exist, though it will be checked in
1513                  * _index_insert also, for avoiding rolling back if exists
1514                  * _index_insert.
1515                  */
1516                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1517                                          MAY_WRITE | MAY_EXEC);
1518                 if (rc != -ENOENT)
1519                         RETURN(rc ? : -EEXIST);
1520         } else {
1521                 /*
1522                  * Check WRITE permission for the parent.
1523                  * EXEC permission have been checked
1524                  * when lookup before create already.
1525                  */
1526                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1527                                                     MOR_TGT_PARENT);
1528                 if (rc)
1529                         RETURN(rc);
1530         }
1531
1532         /* sgid check */
1533         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1534         if (rc != 0)
1535                 RETURN(rc);
1536
1537         if (la->la_mode & S_ISGID) {
1538                 ma->ma_attr.la_gid = la->la_gid;
1539                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1540                         ma->ma_attr.la_mode |= S_ISGID;
1541                         ma->ma_attr.la_valid |= LA_MODE;
1542                 }
1543         }
1544
1545         switch (ma->ma_attr.la_mode & S_IFMT) {
1546         case S_IFLNK: {
1547                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1548
1549                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1550                         RETURN(-ENAMETOOLONG);
1551                 else
1552                         RETURN(0);
1553         }
1554         case S_IFDIR:
1555         case S_IFREG:
1556         case S_IFCHR:
1557         case S_IFBLK:
1558         case S_IFIFO:
1559         case S_IFSOCK:
1560                 rc = 0;
1561                 break;
1562         default:
1563                 rc = -EINVAL;
1564                 break;
1565         }
1566         RETURN(rc);
1567 }
1568
1569 /*
1570  * Create object and insert it into namespace.
1571  */
1572 static int mdd_create(const struct lu_env *env,
1573                       struct md_object *pobj,
1574                       const struct lu_name *lname,
1575                       struct md_object *child,
1576                       struct md_op_spec *spec,
1577                       struct md_attr* ma)
1578 {
1579         struct mdd_thread_info *info = mdd_env_info(env);
1580         struct lu_attr         *la = &info->mti_la_for_fix;
1581         struct md_attr         *ma_acl = &info->mti_ma;
1582         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1583         struct mdd_object      *son = md2mdd_obj(child);
1584         struct mdd_device      *mdd = mdo2mdd(pobj);
1585         struct lu_attr         *attr = &ma->ma_attr;
1586         struct lov_mds_md      *lmm = NULL;
1587         struct thandle         *handle;
1588         struct dynlock_handle  *dlh;
1589         const char             *name = lname->ln_name;
1590         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1591         int got_def_acl = 0;
1592 #ifdef HAVE_QUOTA_SUPPORT
1593         struct obd_device *obd = mdd->mdd_obd_dev;
1594         struct mds_obd *mds = &obd->u.mds;
1595         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1596         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1597         int quota_opc = 0, block_count = 0;
1598         int inode_pending[MAXQUOTAS] = { 0, 0 };
1599         int block_pending[MAXQUOTAS] = { 0, 0 };
1600         int parent_pending[MAXQUOTAS] = { 0, 0 };
1601 #endif
1602         ENTRY;
1603
1604         /*
1605          * Two operations have to be performed:
1606          *
1607          *  - an allocation of a new object (->do_create()), and
1608          *
1609          *  - an insertion into a parent index (->dio_insert()).
1610          *
1611          * Due to locking, operation order is not important, when both are
1612          * successful, *but* error handling cases are quite different:
1613          *
1614          *  - if insertion is done first, and following object creation fails,
1615          *  insertion has to be rolled back, but this operation might fail
1616          *  also leaving us with dangling index entry.
1617          *
1618          *  - if creation is done first, is has to be undone if insertion
1619          *  fails, leaving us with leaked space, which is neither good, nor
1620          *  fatal.
1621          *
1622          * It seems that creation-first is simplest solution, but it is
1623          * sub-optimal in the frequent
1624          *
1625          *         $ mkdir foo
1626          *         $ mkdir foo
1627          *
1628          * case, because second mkdir is bound to create object, only to
1629          * destroy it immediately.
1630          *
1631          * To avoid this follow local file systems that do double lookup:
1632          *
1633          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1634          *
1635          *     1. create            (mdd_object_create_internal())
1636          *
1637          *     2. insert            (__mdd_index_insert(), lookup again)
1638          */
1639
1640         /* Sanity checks before big job. */
1641         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1642         if (rc)
1643                 RETURN(rc);
1644
1645 #ifdef HAVE_QUOTA_SUPPORT
1646         if (mds->mds_quota) {
1647                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1648
1649                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1650                 if (!rc) {
1651                         int same = 0;
1652
1653                         quota_opc = FSFILT_OP_CREATE;
1654                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1655                         mdd_quota_wrapper(la_tmp, qpids);
1656                         /* get file quota for child */
1657                         lquota_chkquota(mds_quota_interface_ref, obd, qcids,
1658                                         inode_pending, 1, NULL, 0, NULL, 0);
1659                         switch (ma->ma_attr.la_mode & S_IFMT) {
1660                         case S_IFLNK:
1661                         case S_IFDIR:
1662                                 block_count = 2;
1663                                 break;
1664                         case S_IFREG:
1665                                 block_count = 1;
1666                                 break;
1667                         }
1668                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1669                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1670                                 block_count += 1;
1671                                 same = 1;
1672                         }
1673                         /* get block quota for child and parent */
1674                         if (block_count)
1675                                 lquota_chkquota(mds_quota_interface_ref, obd,
1676                                                 qcids, block_pending,
1677                                                 block_count, NULL,
1678                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1679                         if (!same)
1680                                 lquota_chkquota(mds_quota_interface_ref, obd,
1681                                                 qpids, parent_pending, 1, NULL,
1682                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1683                 }
1684         }
1685 #endif
1686
1687         /*
1688          * No RPC inside the transaction, so OST objects should be created at
1689          * first.
1690          */
1691         if (S_ISREG(attr->la_mode)) {
1692                 lmm_size = ma->ma_lmm_size;
1693                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1694                                     spec, attr);
1695                 if (rc)
1696                         GOTO(out_pending, rc);
1697         }
1698
1699         if (!S_ISLNK(attr->la_mode)) {
1700                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1701                 ma_acl->ma_acl = info->mti_xattr_buf;
1702                 ma_acl->ma_need = MA_ACL_DEF;
1703                 ma_acl->ma_valid = 0;
1704
1705                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1706                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1707                 mdd_read_unlock(env, mdd_pobj);
1708                 if (rc)
1709                         GOTO(out_free, rc);
1710                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1711                         got_def_acl = 1;
1712         }
1713
1714         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1715         handle = mdd_trans_start(env, mdd);
1716         if (IS_ERR(handle))
1717                 GOTO(out_free, rc = PTR_ERR(handle));
1718
1719         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1720         if (dlh == NULL)
1721                 GOTO(out_trans, rc = -ENOMEM);
1722
1723         mdd_write_lock(env, son, MOR_TGT_CHILD);
1724         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
1725         if (rc) {
1726                 mdd_write_unlock(env, son);
1727                 GOTO(cleanup, rc);
1728         }
1729
1730         created = 1;
1731
1732 #ifdef CONFIG_FS_POSIX_ACL
1733         if (got_def_acl) {
1734                 struct lu_buf *acl_buf = &info->mti_buf;
1735                 acl_buf->lb_buf = ma_acl->ma_acl;
1736                 acl_buf->lb_len = ma_acl->ma_acl_size;
1737
1738                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1739                 if (rc) {
1740                         mdd_write_unlock(env, son);
1741                         GOTO(cleanup, rc);
1742                 } else {
1743                         ma->ma_attr.la_valid |= LA_MODE;
1744                 }
1745         }
1746 #endif
1747
1748         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
1749                                    son, ma, handle, spec);
1750         mdd_write_unlock(env, son);
1751         if (rc)
1752                 /*
1753                  * Object has no links, so it will be destroyed when last
1754                  * reference is released. (XXX not now.)
1755                  */
1756                 GOTO(cleanup, rc);
1757
1758         initialized = 1;
1759
1760         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1761                                 name, S_ISDIR(attr->la_mode), handle,
1762                                 mdd_object_capa(env, mdd_pobj));
1763
1764         if (rc)
1765                 GOTO(cleanup, rc);
1766
1767         inserted = 1;
1768
1769         /* No need mdd_lsm_sanity_check here */
1770         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1771         if (rc) {
1772                 CERROR("error on stripe info copy %d \n", rc);
1773                 GOTO(cleanup, rc);
1774         }
1775         if (lmm && lmm_size > 0) {
1776                 /* Set Lov here, do not get lmm again later */
1777                 memcpy(ma->ma_lmm, lmm, lmm_size);
1778                 ma->ma_lmm_size = lmm_size;
1779                 ma->ma_valid |= MA_LOV;
1780         }
1781
1782         if (S_ISLNK(attr->la_mode)) {
1783                 struct md_ucred  *uc = md_ucred(env);
1784                 struct dt_object *dt = mdd_object_child(son);
1785                 const char *target_name = spec->u.sp_symname;
1786                 int sym_len = strlen(target_name);
1787                 const struct lu_buf *buf;
1788                 loff_t pos = 0;
1789
1790                 buf = mdd_buf_get_const(env, target_name, sym_len);
1791                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1792                                                 mdd_object_capa(env, son),
1793                                                 uc->mu_cap &
1794                                                 CFS_CAP_SYS_RESOURCE_MASK);
1795
1796                 if (rc == sym_len)
1797                         rc = 0;
1798                 else
1799                         GOTO(cleanup, rc = -EFAULT);
1800         }
1801
1802         *la = ma->ma_attr;
1803         la->la_valid = LA_CTIME | LA_MTIME;
1804         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1805         if (rc)
1806                 GOTO(cleanup, rc);
1807
1808         /* Return attr back. */
1809         rc = mdd_attr_get_internal_locked(env, son, ma);
1810         EXIT;
1811 cleanup:
1812         if (rc && created) {
1813                 int rc2 = 0;
1814
1815                 if (inserted) {
1816                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1817                                                  S_ISDIR(attr->la_mode),
1818                                                  handle, BYPASS_CAPA);
1819                         if (rc2)
1820                                 CERROR("error can not cleanup destroy %d\n",
1821                                        rc2);
1822                 }
1823
1824                 if (rc2 == 0) {
1825                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1826                         __mdd_ref_del(env, son, handle, 0);
1827                         if (initialized && S_ISDIR(attr->la_mode))
1828                                 __mdd_ref_del(env, son, handle, 1);
1829                         mdd_write_unlock(env, son);
1830                 }
1831         }
1832
1833         /* update lov_objid data, must be before transaction stop! */
1834         if (rc == 0)
1835                 mdd_lov_objid_update(mdd, lmm);
1836
1837         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1838 out_trans:
1839         if (rc == 0)
1840                 rc = mdd_changelog_ns_store(env, mdd,
1841                             S_ISDIR(attr->la_mode) ? CL_MKDIR :
1842                             S_ISREG(attr->la_mode) ? CL_CREATE :
1843                             S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
1844                             son, mdd_pobj, NULL, lname, handle);
1845         mdd_trans_stop(env, mdd, rc, handle);
1846 out_free:
1847         /* finis lov_create stuff, free all temporary data */
1848         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1849 out_pending:
1850 #ifdef HAVE_QUOTA_SUPPORT
1851         if (quota_opc) {
1852                 lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
1853                                       inode_pending, 0);
1854                 lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
1855                                       block_pending, 1);
1856                 lquota_pending_commit(mds_quota_interface_ref, obd, qpids,
1857                                       parent_pending, 1);
1858                 /* Trigger dqacq on the owner of child and parent. If failed,
1859                  * the next call for lquota_chkquota will process it. */
1860                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1861                               quota_opc);
1862         }
1863 #endif
1864         return rc;
1865 }
1866
1867 /*
1868  * Get locks on parents in proper order
1869  * RETURN: < 0 - error, rename_order if successful
1870  */
1871 enum rename_order {
1872         MDD_RN_SAME,
1873         MDD_RN_SRCTGT,
1874         MDD_RN_TGTSRC
1875 };
1876
1877 static int mdd_rename_order(const struct lu_env *env,
1878                             struct mdd_device *mdd,
1879                             struct mdd_object *src_pobj,
1880                             struct mdd_object *tgt_pobj)
1881 {
1882         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1883         int rc;
1884         ENTRY;
1885
1886         if (src_pobj == tgt_pobj)
1887                 RETURN(MDD_RN_SAME);
1888
1889         /* compared the parent child relationship of src_p&tgt_p */
1890         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1891                 rc = MDD_RN_SRCTGT;
1892         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1893                 rc = MDD_RN_TGTSRC;
1894         } else {
1895                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1896                 if (rc == -EREMOTE)
1897                         rc = 0;
1898
1899                 if (rc == 1)
1900                         rc = MDD_RN_TGTSRC;
1901                 else if (rc == 0)
1902                         rc = MDD_RN_SRCTGT;
1903         }
1904
1905         RETURN(rc);
1906 }
1907
1908 /* has not mdd_write{read}_lock on any obj yet. */
1909 static int mdd_rename_sanity_check(const struct lu_env *env,
1910                                    struct mdd_object *src_pobj,
1911                                    struct mdd_object *tgt_pobj,
1912                                    struct mdd_object *sobj,
1913                                    struct mdd_object *tobj,
1914                                    struct md_attr *ma)
1915 {
1916         int rc = 0;
1917         ENTRY;
1918
1919         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1920                 RETURN(0);
1921
1922         /* XXX: when get here, sobj must NOT be NULL,
1923          * the other case has been processed in cml_rename
1924          * before mdd_rename and enable MDS_PERM_BYPASS. */
1925         LASSERT(sobj);
1926
1927         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1928         if (rc)
1929                 RETURN(rc);
1930
1931         /* XXX: when get here, "tobj == NULL" means tobj must
1932          * NOT exist (neither on remote MDS, such case has been
1933          * processed in cml_rename before mdd_rename and enable
1934          * MDS_PERM_BYPASS).
1935          * So check may_create, but not check may_unlink. */
1936         if (!tobj)
1937                 rc = mdd_may_create(env, tgt_pobj, NULL,
1938                                     (src_pobj != tgt_pobj), 0);
1939         else
1940                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1941                                     (src_pobj != tgt_pobj), 1);
1942
1943         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1944             S_ISDIR(ma->ma_attr.la_mode))
1945                 rc = __mdd_may_link(env, tgt_pobj);
1946
1947         RETURN(rc);
1948 }
1949
1950 /* src object can be remote that is why we use only fid and type of object */
1951 static int mdd_rename(const struct lu_env *env,
1952                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1953                       const struct lu_fid *lf, const struct lu_name *lsname,
1954                       struct md_object *tobj, const struct lu_name *ltname,
1955                       struct md_attr *ma)
1956 {
1957         const char *sname = lsname->ln_name;
1958         const char *tname = ltname->ln_name;
1959         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1960         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
1961         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1962         struct mdd_device *mdd = mdo2mdd(src_pobj);
1963         struct mdd_object *mdd_sobj = NULL;                  /* source object */
1964         struct mdd_object *mdd_tobj = NULL;
1965         struct dynlock_handle *sdlh, *tdlh;
1966         struct thandle *handle;
1967         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
1968         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
1969         int is_dir;
1970         int rc, rc2;
1971
1972 #ifdef HAVE_QUOTA_SUPPORT
1973         struct obd_device *obd = mdd->mdd_obd_dev;
1974         struct mds_obd *mds = &obd->u.mds;
1975         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
1976         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
1977         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
1978         int quota_copc = 0, quota_popc = 0;
1979         int rec_pending[MAXQUOTAS] = { 0, 0 };
1980 #endif
1981         ENTRY;
1982
1983         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1984         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1985
1986         if (tobj)
1987                 mdd_tobj = md2mdd_obj(tobj);
1988
1989 #ifdef HAVE_QUOTA_SUPPORT
1990         if (mds->mds_quota) {
1991                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1992
1993                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
1994                 if (!rc) {
1995                         mdd_quota_wrapper(la_tmp, qspids);
1996                         if (!tobj) {
1997                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
1998                                                 BYPASS_CAPA);
1999                                 if (!rc) {
2000                                         void *data = NULL;
2001                                         mdd_data_get(env, mdd_tpobj, &data);
2002                                         quota_popc = FSFILT_OP_LINK;
2003                                         mdd_quota_wrapper(la_tmp, qtpids);
2004                                         /* get block quota for target parent */
2005                                         lquota_chkquota(mds_quota_interface_ref,
2006                                                         obd, qtpids,
2007                                                         rec_pending, 1, NULL,
2008                                                         LQUOTA_FLAGS_BLK,
2009                                                         data, 1);
2010                                 }
2011                         }
2012                 }
2013         }
2014 #endif
2015         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
2016         handle = mdd_trans_start(env, mdd);
2017         if (IS_ERR(handle))
2018                 GOTO(out_pending, rc = PTR_ERR(handle));
2019
2020         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2021         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2022         if (rc < 0)
2023                 GOTO(cleanup_unlocked, rc);
2024
2025         /* Get locks in determined order */
2026         if (rc == MDD_RN_SAME) {
2027                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2028                                           sname, MOR_SRC_PARENT);
2029                 /* check hashes to determine do we need one lock or two */
2030                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2031                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2032                                 MOR_TGT_PARENT);
2033                 else
2034                         tdlh = sdlh;
2035         } else if (rc == MDD_RN_SRCTGT) {
2036                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2037                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2038         } else {
2039                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2040                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2041         }
2042         if (sdlh == NULL || tdlh == NULL)
2043                 GOTO(cleanup, rc = -ENOMEM);
2044
2045         mdd_sobj = mdd_object_find(env, mdd, lf);
2046         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2047                                      mdd_sobj, mdd_tobj, ma);
2048         if (rc)
2049                 GOTO(cleanup, rc);
2050
2051         /* Remove source name from source directory */
2052         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2053                                 mdd_object_capa(env, mdd_spobj));
2054         if (rc)
2055                 GOTO(cleanup, rc);
2056
2057         /* "mv dir1 dir2" needs "dir1/.." link update */
2058         if (is_dir && mdd_sobj) {
2059                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2060                                         mdd_object_capa(env, mdd_sobj));
2061                 if (rc)
2062                         GOTO(fixup_spobj2, rc);
2063
2064                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2065                                       handle, mdd_object_capa(env, mdd_sobj));
2066                 if (rc) {
2067                         GOTO(fixup_spobj, rc);
2068                 }
2069         }
2070
2071         /* Remove target name from target directory
2072          * Here tobj can be remote one, so we do index_delete unconditionally
2073          * and -ENOENT is allowed.
2074          */
2075         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2076                                 mdd_object_capa(env, mdd_tpobj));
2077         if (rc != 0) {
2078                 if (mdd_tobj) {
2079                         /* tname might been renamed to something else */
2080                         GOTO(fixup_spobj, rc);
2081                 }
2082                 if (rc != -ENOENT)
2083                         GOTO(fixup_spobj, rc);
2084         }
2085
2086         /* Insert new fid with target name into target dir */
2087         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2088                                 mdd_object_capa(env, mdd_tpobj));
2089         if (rc)
2090                 GOTO(fixup_tpobj, rc);
2091
2092         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2093         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2094
2095         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2096         if (mdd_sobj) {
2097                 la->la_valid = LA_CTIME;
2098                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
2099                                                         handle, 0);
2100                 if (rc)
2101                         GOTO(fixup_tpobj, rc);
2102         }
2103
2104         /* Remove old target object
2105          * For tobj is remote case cmm layer has processed
2106          * and set tobj to NULL then. So when tobj is NOT NULL,
2107          * it must be local one.
2108          */
2109         if (tobj && mdd_object_exists(mdd_tobj)) {
2110                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2111                 if (mdd_is_dead_obj(mdd_tobj)) {
2112                         mdd_write_unlock(env, mdd_tobj);
2113                         /* shld not be dead, something is wrong */
2114                         CERROR("tobj is dead, something is wrong\n");
2115                         rc = -EINVAL;
2116                         goto cleanup;
2117                 }
2118                 __mdd_ref_del(env, mdd_tobj, handle, 0);
2119
2120                 /* Remove dot reference. */
2121                 if (is_dir)
2122                         __mdd_ref_del(env, mdd_tobj, handle, 1);
2123
2124                 la->la_valid = LA_CTIME;
2125                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2126                 if (rc)
2127                         GOTO(fixup_tpobj, rc);
2128
2129                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2130                 mdd_write_unlock(env, mdd_tobj);
2131                 if (rc)
2132                         GOTO(fixup_tpobj, rc);
2133
2134 #ifdef HAVE_QUOTA_SUPPORT
2135                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2136                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
2137                         quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2138                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
2139                 }
2140 #endif
2141         }
2142
2143         la->la_valid = LA_CTIME | LA_MTIME;
2144         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
2145         if (rc)
2146                 GOTO(fixup_tpobj, rc);
2147
2148         if (mdd_spobj != mdd_tpobj) {
2149                 la->la_valid = LA_CTIME | LA_MTIME;
2150                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
2151                                                   handle, 0);
2152         }
2153
2154         if (rc == 0 && mdd_sobj) {
2155                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2156                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2157                                       mdo2fid(mdd_tpobj), ltname, handle);
2158                 if (rc == -ENOENT)
2159                         /* Old files might not have EA entry */
2160                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2161                                       lsname, handle);
2162                 mdd_write_unlock(env, mdd_sobj);
2163                 /* We don't fail the transaction if the link ea can't be
2164                    updated -- fid2path will use alternate lookup method. */
2165                 rc = 0;
2166         }
2167
2168         EXIT;
2169
2170 fixup_tpobj:
2171         if (rc) {
2172                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2173                                          BYPASS_CAPA);
2174                 if (rc2)
2175                         CWARN("tp obj fix error %d\n",rc2);
2176
2177                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2178                     !mdd_is_dead_obj(mdd_tobj)) {
2179                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2180                                          mdo2fid(mdd_tobj), tname,
2181                                          is_dir, handle,
2182                                          BYPASS_CAPA);
2183
2184                         if (rc2)
2185                                 CWARN("tp obj fix error %d\n",rc2);
2186                 }
2187         }
2188
2189 fixup_spobj:
2190         if (rc && is_dir && mdd_sobj) {
2191                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2192                                               BYPASS_CAPA);
2193
2194                 if (rc2)
2195                         CWARN("sp obj dotdot delete error %d\n",rc2);
2196
2197
2198                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2199                                               dotdot, handle, BYPASS_CAPA);
2200                 if (rc2)
2201                         CWARN("sp obj dotdot insert error %d\n",rc2);
2202         }
2203
2204 fixup_spobj2:
2205         if (rc) {
2206                 rc2 = __mdd_index_insert(env, mdd_spobj,
2207                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2208                 if (rc2)
2209                         CWARN("sp obj fix error %d\n",rc2);
2210         }
2211 cleanup:
2212         if (likely(tdlh) && sdlh != tdlh)
2213                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2214         if (likely(sdlh))
2215                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2216 cleanup_unlocked:
2217         if (rc == 0)
2218                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj,
2219                                             mdd_spobj, lf, lsname, handle);
2220         if (rc == 0)
2221                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
2222                                             mdd_tpobj, lf, ltname, handle);
2223
2224         mdd_trans_stop(env, mdd, rc, handle);
2225         if (mdd_sobj)
2226                 mdd_object_put(env, mdd_sobj);
2227 out_pending:
2228 #ifdef HAVE_QUOTA_SUPPORT
2229         if (mds->mds_quota) {
2230                 if (quota_popc)
2231                         lquota_pending_commit(mds_quota_interface_ref, obd,
2232                                               qtpids, rec_pending, 1);
2233
2234                 if (quota_copc) {
2235                         /* Trigger dqrel on the source owner of parent.
2236                          * If failed, the next call for lquota_chkquota will
2237                          * process it. */
2238                         lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2239                                       FSFILT_OP_UNLINK_PARTIAL_PARENT);
2240
2241                         /* Trigger dqrel on the target owner of child.
2242                          * If failed, the next call for lquota_chkquota
2243                          * will process it. */
2244                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2245                                       qtpids, rc, quota_copc);
2246                 }
2247         }
2248 #endif
2249         return rc;
2250 }
2251
2252 /** enable/disable storing of hardlink info */
2253 int mdd_linkea_enable = 1;
2254 CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644,
2255                 "record hardlink info in EAs");
2256
2257 /** Read the link EA into a temp buffer.
2258  * Uses the name_buf since it is generally large.
2259  * \retval IS_ERR err
2260  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2261  */
2262 struct lu_buf *mdd_links_get(const struct lu_env *env,
2263                              struct mdd_object *mdd_obj)
2264 {
2265         struct lu_buf *buf;
2266         struct lustre_capa *capa;
2267         struct link_ea_header *leh;
2268         int rc;
2269
2270         /* First try a small buf */
2271         buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2272         if (buf->lb_buf == NULL)
2273                 return ERR_PTR(-ENOMEM);
2274
2275         capa = mdd_object_capa(env, mdd_obj);
2276         rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2277         if (rc == -ERANGE) {
2278                 /* Buf was too small, figure out what we need. */
2279                 buf->lb_buf = NULL;
2280                 buf->lb_len = 0;
2281                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2282                 if (rc < 0)
2283                         return ERR_PTR(rc);
2284                 buf = mdd_buf_alloc(env, rc);
2285                 if (buf->lb_buf == NULL)
2286                         return ERR_PTR(-ENOMEM);
2287                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2288         }
2289         if (rc < 0)
2290                 return ERR_PTR(rc);
2291
2292         leh = buf->lb_buf;
2293         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2294                 leh->leh_magic = LINK_EA_MAGIC;
2295                 leh->leh_reccount = __swab32(leh->leh_reccount);
2296                 leh->leh_len = __swab64(leh->leh_len);
2297                 /* entries are swabbed by mdd_lee_unpack */
2298         }
2299         if (leh->leh_magic != LINK_EA_MAGIC)
2300                 return ERR_PTR(-EINVAL);
2301         if (leh->leh_reccount == 0)
2302                 return ERR_PTR(-ENODATA);
2303
2304         return buf;
2305 }
2306
2307 /** Pack a link_ea_entry.
2308  * All elements are stored as chars to avoid alignment issues.
2309  * Numbers are always big-endian
2310  * \param packbuf is a temp fid buffer
2311  * \retval record length
2312  */
2313 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2314                          const struct lu_fid *pfid, struct lu_fid* packbuf)
2315 {
2316         char *ptr;
2317         int reclen;
2318
2319         fid_pack(&lee->lee_parent_fid, pfid, packbuf);
2320         ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len;
2321         strncpy(ptr, lname->ln_name, lname->ln_namelen);
2322         reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen +
2323                 sizeof(lee->lee_reclen);
2324         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2325         lee->lee_reclen[1] = reclen & 0xff;
2326         return reclen;
2327 }
2328
2329 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2330                     struct lu_name *lname, struct lu_fid *pfid)
2331 {
2332         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2333         fid_unpack(&lee->lee_parent_fid, pfid);
2334         lname->ln_name = (char *)&lee->lee_parent_fid +
2335                 lee->lee_parent_fid.fp_len;
2336         lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len -
2337                 sizeof(lee->lee_reclen);
2338 }
2339
2340 /** Add a record to the end of link ea buf */
2341 static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
2342                            const struct lu_fid *pfid,
2343                            const struct lu_name *lname)
2344 {
2345         struct link_ea_header *leh;
2346         struct link_ea_entry *lee;
2347         int reclen;
2348
2349         if (lname == NULL || pfid == NULL)
2350                 return -EINVAL;
2351
2352         /* Make sure our buf is big enough for the new one */
2353         leh = buf->lb_buf;
2354         reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2355         if (leh->leh_len + reclen > buf->lb_len) {
2356                 if (mdd_buf_grow(env, leh->leh_len + reclen) < 0)
2357                         return -ENOMEM;
2358         }
2359
2360         leh = buf->lb_buf;
2361         lee = buf->lb_buf + leh->leh_len;
2362         reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2);
2363         leh->leh_len += reclen;
2364         leh->leh_reccount++;
2365         return 0;
2366 }
2367
2368 /* For pathologic linkers, we don't want to spend lots of time scanning the
2369  * link ea.  Limit ourseleves to something reasonable; links not in the EA
2370  * can be looked up via (slower) parent lookup.
2371  */
2372 #define LINKEA_MAX_COUNT 128
2373
2374 static int mdd_links_add(const struct lu_env *env,
2375                          struct mdd_object *mdd_obj,
2376                          const struct lu_fid *pfid,
2377                          const struct lu_name *lname,
2378                          struct thandle *handle)
2379 {
2380         struct lu_buf *buf;
2381         struct link_ea_header *leh;
2382         int rc;
2383         ENTRY;
2384
2385         if (!mdd_linkea_enable)
2386                 RETURN(0);
2387
2388         buf = mdd_links_get(env, mdd_obj);
2389         if (IS_ERR(buf)) {
2390                 rc = PTR_ERR(buf);
2391                 if (rc != -ENODATA) {
2392                         CERROR("link_ea read failed %d "DFID"\n", rc,
2393                                PFID(mdd_object_fid(mdd_obj)));
2394                         RETURN (rc);
2395                 }
2396                 /* empty EA; start one */
2397                 buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2398                 if (buf->lb_buf == NULL)
2399                         RETURN(-ENOMEM);
2400                 leh = buf->lb_buf;
2401                 leh->leh_magic = LINK_EA_MAGIC;
2402                 leh->leh_len = sizeof(struct link_ea_header);
2403                 leh->leh_reccount = 0;
2404         }
2405
2406         leh = buf->lb_buf;
2407         if (leh->leh_reccount > LINKEA_MAX_COUNT)
2408                 RETURN(-EOVERFLOW);
2409
2410         rc = __mdd_links_add(env, buf, pfid, lname);
2411         if (rc)
2412                 RETURN(rc);
2413
2414         leh = buf->lb_buf;
2415         rc = __mdd_xattr_set(env, mdd_obj,
2416                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2417                              XATTR_NAME_LINK, 0, handle);
2418         if (rc)
2419                 CERROR("link_ea add failed %d "DFID"\n", rc,
2420                        PFID(mdd_object_fid(mdd_obj)));
2421
2422         if (buf->lb_vmalloc)
2423                 /* if we vmalloced a large buffer drop it */
2424                 mdd_buf_put(buf);
2425
2426         RETURN (rc);
2427 }
2428
2429 static int mdd_links_rename(const struct lu_env *env,
2430                             struct mdd_object *mdd_obj,
2431                             const struct lu_fid *oldpfid,
2432                             const struct lu_name *oldlname,
2433                             const struct lu_fid *newpfid,
2434                             const struct lu_name *newlname,
2435                             struct thandle *handle)
2436 {
2437         struct lu_buf  *buf;
2438         struct link_ea_header *leh;
2439         struct link_ea_entry  *lee;
2440         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
2441         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2442         int reclen = 0;
2443         int count;
2444         int rc, rc2 = 0;
2445         ENTRY;
2446
2447         if (!mdd_linkea_enable)
2448                 RETURN(0);
2449
2450         if (mdd_obj->mod_flags & DEAD_OBJ)
2451                 /* No more links, don't bother */
2452                 RETURN(0);
2453
2454         buf = mdd_links_get(env, mdd_obj);
2455         if (IS_ERR(buf)) {
2456                 rc = PTR_ERR(buf);
2457                 CERROR("link_ea read failed %d "DFID"\n",
2458                        rc, PFID(mdd_object_fid(mdd_obj)));
2459                 RETURN(rc);
2460         }
2461         leh = buf->lb_buf;
2462         lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
2463
2464         /* Find the old record */
2465         for(count = 0; count <= leh->leh_reccount; count++) {
2466                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
2467                 if (tmpname->ln_namelen == oldlname->ln_namelen &&
2468                     lu_fid_eq(tmpfid, oldpfid) &&
2469                     (strncmp(tmpname->ln_name, oldlname->ln_name,
2470                              tmpname->ln_namelen) == 0))
2471                         break;
2472                 lee = (struct link_ea_entry *)((char *)lee + reclen);
2473         }
2474         if (count > leh->leh_reccount) {
2475                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2476                        oldlname->ln_namelen, oldlname->ln_name);
2477                 GOTO(out, rc = -ENOENT);
2478         }
2479
2480         /* Remove the old record */
2481         leh->leh_reccount--;
2482         leh->leh_len -= reclen;
2483         memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len -
2484                 (char *)lee);
2485
2486         /* If renaming, add the new record */
2487         if (newpfid != NULL) {
2488                 /* if the add fails, we still delete the out-of-date old link */
2489                 rc2 = __mdd_links_add(env, buf, newpfid, newlname);
2490                 leh = buf->lb_buf;
2491         }
2492
2493         rc = __mdd_xattr_set(env, mdd_obj,
2494                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2495                              XATTR_NAME_LINK, 0, handle);
2496
2497 out:
2498         if (rc == 0)
2499                 rc = rc2;
2500         if (rc)
2501                 CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n",
2502                        oldlname->ln_namelen, oldlname->ln_name, rc,
2503                        PFID(mdd_object_fid(mdd_obj)));
2504
2505         if (buf->lb_vmalloc)
2506                 /* if we vmalloced a large buffer drop it */
2507                 mdd_buf_put(buf);
2508
2509         RETURN (rc);
2510 }
2511
2512 const struct md_dir_operations mdd_dir_ops = {
2513         .mdo_is_subdir     = mdd_is_subdir,
2514         .mdo_lookup        = mdd_lookup,
2515         .mdo_create        = mdd_create,
2516         .mdo_rename        = mdd_rename,
2517         .mdo_link          = mdd_link,
2518         .mdo_unlink        = mdd_unlink,
2519         .mdo_name_insert   = mdd_name_insert,
2520         .mdo_name_remove   = mdd_name_remove,
2521         .mdo_rename_tgt    = mdd_rename_tgt,
2522         .mdo_create_data   = mdd_create_data
2523 };