Whamcloud - gitweb
b=16721
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int mdd_links_add(const struct lu_env *env,
75                          struct mdd_object *mdd_obj,
76                          const struct lu_fid *pfid,
77                          const struct lu_name *lname,
78                          struct thandle *handle);
79 static int mdd_links_rename(const struct lu_env *env,
80                             struct mdd_object *mdd_obj,
81                             const struct lu_fid *oldpfid,
82                             const struct lu_name *oldlname,
83                             const struct lu_fid *newpfid,
84                             const struct lu_name *newlname,
85                             struct thandle *handle);
86
87 static int
88 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
89                     const struct lu_name *lname, struct lu_fid* fid, int mask)
90 {
91         const char *name = lname->ln_name;
92         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
93         struct dynlock_handle *dlh;
94         int rc;
95
96         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
97         if (unlikely(dlh == NULL))
98                 return -ENOMEM;
99         rc = __mdd_lookup(env, pobj, lname, fid, mask);
100         mdd_pdo_read_unlock(env, mdd_obj, dlh);
101
102         return rc;
103 }
104
105 int mdd_lookup(const struct lu_env *env,
106                struct md_object *pobj, const struct lu_name *lname,
107                struct lu_fid* fid, struct md_op_spec *spec)
108 {
109         int rc;
110         ENTRY;
111         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
112         RETURN(rc);
113 }
114
115 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
116                           struct lu_fid *fid)
117 {
118         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
119 }
120
121 /*
122  * For root fid use special function, which does not compare version component
123  * of fid. Version component is different for root fids on all MDTs.
124  */
125 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
126 {
127         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
128                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
129 }
130
131 /*
132  * return 1: if lf is the fid of the ancestor of p1;
133  * return 0: if not;
134  *
135  * return -EREMOTE: if remote object is found, in this
136  * case fid of remote object is saved to @pf;
137  *
138  * otherwise: values < 0, errors.
139  */
140 static int mdd_is_parent(const struct lu_env *env,
141                          struct mdd_device *mdd,
142                          struct mdd_object *p1,
143                          const struct lu_fid *lf,
144                          struct lu_fid *pf)
145 {
146         struct mdd_object *parent = NULL;
147         struct lu_fid *pfid;
148         int rc;
149         ENTRY;
150
151         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
152         pfid = &mdd_env_info(env)->mti_fid;
153
154         /* Check for root first. */
155         if (mdd_is_root(mdd, mdo2fid(p1)))
156                 RETURN(0);
157
158         for(;;) {
159                 /* this is done recursively, bypass capa for each obj */
160                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
161                 rc = mdd_parent_fid(env, p1, pfid);
162                 if (rc)
163                         GOTO(out, rc);
164                 if (mdd_is_root(mdd, pfid))
165                         GOTO(out, rc = 0);
166                 if (lu_fid_eq(pfid, lf))
167                         GOTO(out, rc = 1);
168                 if (parent)
169                         mdd_object_put(env, parent);
170                 parent = mdd_object_find(env, mdd, pfid);
171
172                 /* cross-ref parent */
173                 if (parent == NULL) {
174                         if (pf != NULL)
175                                 *pf = *pfid;
176                         GOTO(out, rc = -EREMOTE);
177                 } else if (IS_ERR(parent))
178                         GOTO(out, rc = PTR_ERR(parent));
179                 p1 = parent;
180         }
181         EXIT;
182 out:
183         if (parent && !IS_ERR(parent))
184                 mdd_object_put(env, parent);
185         return rc;
186 }
187
188 /*
189  * No permission check is needed.
190  *
191  * returns 1: if fid is ancestor of @mo;
192  * returns 0: if fid is not a ancestor of @mo;
193  *
194  * returns EREMOTE if remote object is found, fid of remote object is saved to
195  * @fid;
196  *
197  * returns < 0: if error
198  */
199 static int mdd_is_subdir(const struct lu_env *env,
200                          struct md_object *mo, const struct lu_fid *fid,
201                          struct lu_fid *sfid)
202 {
203         struct mdd_device *mdd = mdo2mdd(mo);
204         int rc;
205         ENTRY;
206
207         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
208                 RETURN(0);
209
210         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
211         if (rc == 0) {
212                 /* found root */
213                 fid_zero(sfid);
214         } else if (rc == 1) {
215                 /* found @fid is parent */
216                 *sfid = *fid;
217                 rc = 0;
218         }
219         RETURN(rc);
220 }
221
222 /*
223  * Check that @dir contains no entries except (possibly) dot and dotdot.
224  *
225  * Returns:
226  *
227  *             0        empty
228  *      -ENOTDIR        not a directory object
229  *    -ENOTEMPTY        not empty
230  *           -ve        other error
231  *
232  */
233 static int mdd_dir_is_empty(const struct lu_env *env,
234                             struct mdd_object *dir)
235 {
236         struct dt_it     *it;
237         struct dt_object *obj;
238         const struct dt_it_ops *iops;
239         int result;
240         ENTRY;
241
242         obj = mdd_object_child(dir);
243         if (!dt_try_as_dir(env, obj))
244                 RETURN(-ENOTDIR);
245
246         iops = &obj->do_index_ops->dio_it;
247         it = iops->init(env, obj, BYPASS_CAPA);
248         if (it != NULL) {
249                 result = iops->get(env, it, (const void *)"");
250                 if (result > 0) {
251                         int i;
252                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
253                                 result = iops->next(env, it);
254                         if (result == 0)
255                                 result = -ENOTEMPTY;
256                         else if (result == +1)
257                                 result = 0;
258                 } else if (result == 0)
259                         /*
260                          * Huh? Index contains no zero key?
261                          */
262                         result = -EIO;
263
264                 iops->put(env, it);
265                 iops->fini(env, it);
266         } else
267                 result = -ENOMEM;
268         RETURN(result);
269 }
270
271 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
272 {
273         struct mdd_device *m = mdd_obj2mdd_dev(obj);
274         struct lu_attr *la = &mdd_env_info(env)->mti_la;
275         int rc;
276         ENTRY;
277
278         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
279         if (rc)
280                 RETURN(rc);
281
282         /*
283          * Subdir count limitation can be broken through.
284          */
285         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
286             !S_ISDIR(la->la_mode))
287                 RETURN(-EMLINK);
288         else
289                 RETURN(0);
290 }
291
292 /*
293  * Check whether it may create the cobj under the pobj.
294  * cobj maybe NULL
295  */
296 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
297                    struct mdd_object *cobj, int check_perm, int check_nlink)
298 {
299         int rc = 0;
300         ENTRY;
301
302         if (cobj && mdd_object_exists(cobj))
303                 RETURN(-EEXIST);
304
305         if (mdd_is_dead_obj(pobj))
306                 RETURN(-ENOENT);
307
308         if (check_perm)
309                 rc = mdd_permission_internal_locked(env, pobj, NULL,
310                                                     MAY_WRITE | MAY_EXEC,
311                                                     MOR_TGT_PARENT);
312
313         if (!rc && check_nlink)
314                 rc = __mdd_may_link(env, pobj);
315
316         RETURN(rc);
317 }
318
319 /*
320  * Check whether can unlink from the pobj in the case of "cobj == NULL".
321  */
322 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
323                    const struct md_attr *ma)
324 {
325         int rc;
326         ENTRY;
327
328         if (mdd_is_dead_obj(pobj))
329                 RETURN(-ENOENT);
330
331         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
332             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
333                 RETURN(-EPERM);
334
335         rc = mdd_permission_internal_locked(env, pobj, NULL,
336                                             MAY_WRITE | MAY_EXEC,
337                                             MOR_TGT_PARENT);
338         if (rc)
339                 RETURN(rc);
340
341         if (mdd_is_append(pobj))
342                 RETURN(-EPERM);
343
344         RETURN(rc);
345 }
346
347 /*
348  * pobj == NULL is remote ops case, under such case, pobj's
349  * VTX feature has been checked already, no need check again.
350  */
351 static inline int mdd_is_sticky(const struct lu_env *env,
352                                 struct mdd_object *pobj,
353                                 struct mdd_object *cobj)
354 {
355         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
356         struct md_ucred *uc = md_ucred(env);
357         int rc;
358
359         if (pobj) {
360                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
361                 if (rc)
362                         return rc;
363
364                 if (!(tmp_la->la_mode & S_ISVTX) ||
365                      (tmp_la->la_uid == uc->mu_fsuid))
366                         return 0;
367         }
368
369         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
370         if (rc)
371                 return rc;
372
373         if (tmp_la->la_uid == uc->mu_fsuid)
374                 return 0;
375
376         return !mdd_capable(uc, CFS_CAP_FOWNER);
377 }
378
379 /*
380  * Check whether it may delete the cobj from the pobj.
381  * pobj maybe NULL
382  */
383 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
384                    struct mdd_object *cobj, struct md_attr *ma,
385                    int check_perm, int check_empty)
386 {
387         int rc = 0;
388         ENTRY;
389
390         LASSERT(cobj);
391         if (!mdd_object_exists(cobj))
392                 RETURN(-ENOENT);
393
394         if (pobj) {
395                 if (mdd_is_dead_obj(pobj))
396                         RETURN(-ENOENT);
397
398                 if (check_perm) {
399                         rc = mdd_permission_internal_locked(env, pobj, NULL,
400                                                     MAY_WRITE | MAY_EXEC,
401                                                     MOR_TGT_PARENT);
402                         if (rc)
403                                 RETURN(rc);
404                 }
405
406                 if (mdd_is_append(pobj))
407                         RETURN(-EPERM);
408         }
409
410         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
411             mdd_is_sticky(env, pobj, cobj))
412                 RETURN(-EPERM);
413
414         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
415                 RETURN(-EPERM);
416
417         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
418             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
419                 RETURN(-EPERM);
420
421         if (S_ISDIR(ma->ma_attr.la_mode)) {
422                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
423
424                 if (!S_ISDIR(mdd_object_type(cobj)))
425                         RETURN(-ENOTDIR);
426
427                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
428                         RETURN(-EBUSY);
429         } else if (S_ISDIR(mdd_object_type(cobj)))
430                 RETURN(-EISDIR);
431
432         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
433                 rc = mdd_dir_is_empty(env, cobj);
434
435         RETURN(rc);
436 }
437
438 /*
439  * tgt maybe NULL
440  * has mdd_write_lock on src already, but not on tgt yet
441  */
442 int mdd_link_sanity_check(const struct lu_env *env,
443                           struct mdd_object *tgt_obj,
444                           const struct lu_name *lname,
445                           struct mdd_object *src_obj)
446 {
447         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
448         int rc = 0;
449         ENTRY;
450
451         if (mdd_is_dead_obj(src_obj))
452                 RETURN(-ESTALE);
453
454         /* Local ops, no lookup before link, check filename length here. */
455         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
456                 RETURN(-ENAMETOOLONG);
457
458         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
459                 RETURN(-EPERM);
460
461         if (S_ISDIR(mdd_object_type(src_obj)))
462                 RETURN(-EPERM);
463
464         LASSERT(src_obj != tgt_obj);
465         if (tgt_obj) {
466                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
467                 if (rc)
468                         RETURN(rc);
469         }
470
471         rc = __mdd_may_link(env, src_obj);
472
473         RETURN(rc);
474 }
475
476 /**
477  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
478  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
479  * (maybe too large to be represented). It is a trick to break through the
480  * "i_nlink" limitation for subdir count.
481  */
482 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
483                    struct thandle *handle)
484 {
485         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
486         struct mdd_device *m = mdd_obj2mdd_dev(obj);
487
488         if (!mdd_is_mnlink(obj)) {
489                 if (S_ISDIR(mdd_object_type(obj))) {
490                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
491                                 return;
492
493                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
494                                 obj->mod_flags |= MNLINK_OBJ;
495                                 tmp_la->la_nlink = 1;
496                                 tmp_la->la_valid = LA_NLINK;
497                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
498                                                       0);
499                                 return;
500                         }
501                 }
502                 mdo_ref_add(env, obj, handle);
503         }
504 }
505
506 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
507                    struct thandle *handle, int is_dot)
508 {
509         if (!mdd_is_mnlink(obj) || is_dot)
510                 mdo_ref_del(env, obj, handle);
511 }
512
513 /* insert named index, add reference if isdir */
514 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
515                               const struct lu_fid *lf, const char *name, int is_dir,
516                               struct thandle *handle, struct lustre_capa *capa)
517 {
518         struct dt_object *next = mdd_object_child(pobj);
519         int               rc;
520         ENTRY;
521
522         if (dt_try_as_dir(env, next)) {
523                 struct md_ucred  *uc = md_ucred(env);
524
525                 rc = next->do_index_ops->dio_insert(env, next,
526                                                     __mdd_fid_rec(env, lf),
527                                                     (const struct dt_key *)name,
528                                                     handle, capa, uc->mu_cap &
529                                                     CFS_CAP_SYS_RESOURCE_MASK);
530         } else {
531                 rc = -ENOTDIR;
532         }
533
534         if (rc == 0) {
535                 if (is_dir) {
536                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
537                         __mdd_ref_add(env, pobj, handle);
538                         mdd_write_unlock(env, pobj);
539                 }
540         }
541         RETURN(rc);
542 }
543
544 /* delete named index, drop reference if isdir */
545 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
546                               const char *name, int is_dir, struct thandle *handle,
547                               struct lustre_capa *capa)
548 {
549         struct dt_object *next = mdd_object_child(pobj);
550         int               rc;
551         ENTRY;
552
553         if (dt_try_as_dir(env, next)) {
554                 rc = next->do_index_ops->dio_delete(env, next,
555                                                     (struct dt_key *)name,
556                                                     handle, capa);
557                 if (rc == 0 && is_dir) {
558                         int is_dot = 0;
559
560                         if (name != NULL && name[0] == '.' && name[1] == 0)
561                                 is_dot = 1;
562                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
563                         __mdd_ref_del(env, pobj, handle, is_dot);
564                         mdd_write_unlock(env, pobj);
565                 }
566         } else
567                 rc = -ENOTDIR;
568
569         RETURN(rc);
570 }
571
572 static int
573 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
574                         const struct lu_fid *lf, const char *name,
575                         struct thandle *handle, struct lustre_capa *capa)
576 {
577         struct dt_object *next = mdd_object_child(pobj);
578         int               rc;
579         ENTRY;
580
581         if (dt_try_as_dir(env, next)) {
582                 struct md_ucred  *uc = md_ucred(env);
583
584                 rc = next->do_index_ops->dio_insert(env, next,
585                                                     __mdd_fid_rec(env, lf),
586                                                     (const struct dt_key *)name,
587                                                     handle, capa, uc->mu_cap &
588                                                     CFS_CAP_SYS_RESOURCE_MASK);
589         } else {
590                 rc = -ENOTDIR;
591         }
592         RETURN(rc);
593 }
594
595 /** Store a namespace change changelog record
596  * If this fails, we must fail the whole transaction; we don't
597  * want the change to commit without the log entry.
598  * \param target - mdd_object of change
599  * \param parent - parent dir/object
600  * \param tf - target lu_fid, overrides fid of \a target if this is non-null
601  * \param tname - target name string
602  * \param handle - transacion handle
603  */
604 static int mdd_changelog_ns_store(const struct lu_env  *env,
605                                   struct mdd_device    *mdd,
606                                   enum changelog_rec_type type,
607                                   struct mdd_object    *target,
608                                   struct mdd_object    *parent,
609                                   const struct lu_fid  *tf,
610                                   const struct lu_name *tname,
611                                   struct thandle *handle)
612 {
613         const struct lu_fid *tfid;
614         const struct lu_fid *tpfid = mdo2fid(parent);
615         struct llog_changelog_rec *rec;
616         struct lu_buf *buf;
617         int reclen;
618         int rc;
619         ENTRY;
620
621         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
622                 RETURN(0);
623
624         LASSERT(parent != NULL);
625         LASSERT(tname != NULL);
626         LASSERT(handle != NULL);
627
628         /* target */
629         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
630         buf = mdd_buf_alloc(env, reclen);
631         if (buf->lb_buf == NULL)
632                 RETURN(-ENOMEM);
633         rec = (struct llog_changelog_rec *)buf->lb_buf;
634
635         rec->cr_flags = CLF_VERSION;
636         rec->cr_type = (__u32)type;
637         tfid = tf ? tf : mdo2fid(target);
638         rec->cr_tfid = *tfid;
639         rec->cr_pfid = *tpfid;
640         rec->cr_namelen = tname->ln_namelen;
641         memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen);
642         if (likely(target))
643                 target->mod_cltime = cfs_time_current_64();
644
645         rc = mdd_changelog_llog_write(mdd, rec, handle);
646         if (rc < 0) {
647                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
648                        rc, type, tname->ln_name, PFID(tfid), PFID(tpfid));
649                 return -EFAULT;
650         }
651
652         return 0;
653 }
654
655 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
656                     struct md_object *src_obj, const struct lu_name *lname,
657                     struct md_attr *ma)
658 {
659         const char *name = lname->ln_name;
660         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
661         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
662         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
663         struct mdd_device *mdd = mdo2mdd(src_obj);
664         struct dynlock_handle *dlh;
665         struct thandle *handle;
666 #ifdef HAVE_QUOTA_SUPPORT
667         struct obd_device *obd = mdd->mdd_obd_dev;
668         struct mds_obd *mds = &obd->u.mds;
669         unsigned int qids[MAXQUOTAS] = { 0, 0 };
670         int quota_opc = 0, rec_pending = 0;
671 #endif
672         int rc;
673         ENTRY;
674
675 #ifdef HAVE_QUOTA_SUPPORT
676         if (mds->mds_quota) {
677                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
678
679                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
680                 if (!rc) {
681                         void *data = NULL;
682                         mdd_data_get(env, mdd_tobj, &data);
683                         quota_opc = FSFILT_OP_LINK;
684                         mdd_quota_wrapper(la_tmp, qids);
685                         /* get block quota for parent */
686                         lquota_chkquota(mds_quota_interface_ref, obd,
687                                         qids[USRQUOTA], qids[GRPQUOTA], 1,
688                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK,
689                                         data, 1);
690                 }
691         }
692 #endif
693
694         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
695         handle = mdd_trans_start(env, mdd);
696         if (IS_ERR(handle))
697                 GOTO(out_pending, rc = PTR_ERR(handle));
698
699         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
700         if (dlh == NULL)
701                 GOTO(out_trans, rc = -ENOMEM);
702         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
703
704         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
705         if (rc)
706                 GOTO(out_unlock, rc);
707
708         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
709                                      name, handle,
710                                      mdd_object_capa(env, mdd_tobj));
711         if (rc)
712                 GOTO(out_unlock, rc);
713
714         __mdd_ref_add(env, mdd_sobj, handle);
715
716         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
717         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
718
719         la->la_valid = LA_CTIME | LA_MTIME;
720         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
721         if (rc)
722                 GOTO(out_unlock, rc);
723
724         la->la_valid = LA_CTIME;
725         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
726         if (rc == 0)
727                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle);
728
729         EXIT;
730 out_unlock:
731         mdd_write_unlock(env, mdd_sobj);
732         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
733 out_trans:
734         if (rc == 0)
735                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj,
736                                             mdd_tobj, NULL, lname, handle);
737         mdd_trans_stop(env, mdd, rc, handle);
738 out_pending:
739 #ifdef HAVE_QUOTA_SUPPORT
740         if (quota_opc) {
741                 if (rec_pending)
742                         lquota_pending_commit(mds_quota_interface_ref, obd,
743                                               qids[USRQUOTA], qids[GRPQUOTA],
744                                               rec_pending, 1);
745                 /* Trigger dqacq for the parent owner. If failed,
746                  * the next call for lquota_chkquota will process it. */
747                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
748                               quota_opc);
749         }
750 #endif
751         return rc;
752 }
753
754 /* caller should take a lock before calling */
755 int mdd_finish_unlink(const struct lu_env *env,
756                       struct mdd_object *obj, struct md_attr *ma,
757                       struct thandle *th)
758 {
759         int rc;
760         int reset = 1;
761         ENTRY;
762
763         rc = mdd_iattr_get(env, obj, ma);
764         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
765                 /* add new orphan and the object
766                  * will be deleted during mdd_close() */
767                 if (obj->mod_count) {
768                         rc = __mdd_orphan_add(env, obj, th);
769                         if (rc == 0) {
770                                 obj->mod_flags |= ORPHAN_OBJ;
771                                 CDEBUG(D_HA, "Object "DFID" is going to be "
772                                         "an orphan, open count = %d\n",
773                                         PFID(mdd_object_fid(obj)),
774                                         obj->mod_count);
775                         }
776                 }
777
778                 obj->mod_flags |= DEAD_OBJ;
779                 if (!(obj->mod_flags & ORPHAN_OBJ)) {
780                         rc = mdd_object_kill(env, obj, ma);
781                         if (rc == 0)
782                                 reset = 0;
783                 }
784
785         }
786         if (reset)
787                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
788
789         RETURN(rc);
790 }
791
792 /*
793  * pobj maybe NULL
794  * has mdd_write_lock on cobj already, but not on pobj yet
795  */
796 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
797                             struct mdd_object *cobj, struct md_attr *ma)
798 {
799         int rc;
800         ENTRY;
801
802         if (mdd_is_dead_obj(cobj))
803                 RETURN(-ESTALE);
804
805         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
806
807         RETURN(rc);
808 }
809
810 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
811                       struct md_object *cobj, const struct lu_name *lname,
812                       struct md_attr *ma)
813 {
814         const char *name = lname->ln_name;
815         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
816         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
817         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
818         struct mdd_device *mdd = mdo2mdd(pobj);
819         struct dynlock_handle *dlh;
820         struct thandle    *handle;
821 #ifdef HAVE_QUOTA_SUPPORT
822         struct obd_device *obd = mdd->mdd_obd_dev;
823         struct mds_obd *mds = &obd->u.mds;
824         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
825         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
826         int quota_opc = 0;
827 #endif
828         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
829         int rc;
830         ENTRY;
831
832         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
833                  PFID(mdd_object_fid(mdd_cobj)));
834
835         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
836         if (rc)
837                 RETURN(rc);
838
839         handle = mdd_trans_start(env, mdd);
840         if (IS_ERR(handle))
841                 RETURN(PTR_ERR(handle));
842
843         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
844         if (dlh == NULL)
845                 GOTO(out_trans, rc = -ENOMEM);
846         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
847
848         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
849         if (rc)
850                 GOTO(cleanup, rc);
851
852         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
853                                 mdd_object_capa(env, mdd_pobj));
854         if (rc)
855                 GOTO(cleanup, rc);
856
857         __mdd_ref_del(env, mdd_cobj, handle, 0);
858         if (is_dir)
859                 /* unlink dot */
860                 __mdd_ref_del(env, mdd_cobj, handle, 1);
861
862         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
863         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
864
865         la->la_valid = LA_CTIME | LA_MTIME;
866         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
867         if (rc)
868                 GOTO(cleanup, rc);
869
870         la->la_valid = LA_CTIME;
871         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
872         if (rc)
873                 GOTO(cleanup, rc);
874
875         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
876 #ifdef HAVE_QUOTA_SUPPORT
877         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
878             ma->ma_attr.la_nlink == 0) {
879                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
880
881                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
882                 if (!rc) {
883                         mdd_quota_wrapper(la_tmp, qpids);
884                         if (mdd_cobj->mod_count == 0) {
885                                 quota_opc = FSFILT_OP_UNLINK;
886                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
887                         } else {
888                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
889                         }
890                 }
891         }
892 #endif
893
894         if (rc == 0)
895                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
896                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
897                                    NULL, NULL);
898         if (!is_dir)
899                 /* old files may not have link ea; ignore errors */
900                 mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj),
901                                  lname, NULL, NULL, handle);
902
903         EXIT;
904 cleanup:
905         mdd_write_unlock(env, mdd_cobj);
906         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
907 out_trans:
908         if (rc == 0)
909                 rc = mdd_changelog_ns_store(env, mdd,
910                                             is_dir ? CL_RMDIR : CL_UNLINK,
911                                             mdd_cobj, mdd_pobj, NULL, lname,
912                                             handle);
913
914         mdd_trans_stop(env, mdd, rc, handle);
915 #ifdef HAVE_QUOTA_SUPPORT
916         if (quota_opc)
917                 /* Trigger dqrel on the owner of child and parent. If failed,
918                  * the next call for lquota_chkquota will process it. */
919                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
920                               quota_opc);
921 #endif
922         return rc;
923 }
924
925 /* has not lock on pobj yet */
926 static int mdd_ni_sanity_check(const struct lu_env *env,
927                                struct md_object *pobj,
928                                const struct md_attr *ma)
929 {
930         struct mdd_object *obj = md2mdd_obj(pobj);
931         int rc;
932         ENTRY;
933
934         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
935                 RETURN(0);
936
937         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
938
939         RETURN(rc);
940 }
941
942 /*
943  * Partial operation.
944  */
945 static int mdd_name_insert(const struct lu_env *env,
946                            struct md_object *pobj,
947                            const struct lu_name *lname,
948                            const struct lu_fid *fid,
949                            const struct md_attr *ma)
950 {
951         const char *name = lname->ln_name;
952         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
953         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
954         struct mdd_device *mdd = mdo2mdd(pobj);
955         struct dynlock_handle *dlh;
956         struct thandle *handle;
957         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
958 #ifdef HAVE_QUOTA_SUPPORT
959         struct md_ucred *uc = md_ucred(env);
960         struct obd_device *obd = mdd->mdd_obd_dev;
961         struct mds_obd *mds = &obd->u.mds;
962         unsigned int qids[MAXQUOTAS] = { 0, 0 };
963         int quota_opc = 0, rec_pending = 0;
964         cfs_cap_t save = uc->mu_cap;
965 #endif
966         int rc;
967         ENTRY;
968
969 #ifdef HAVE_QUOTA_SUPPORT
970         if (mds->mds_quota) {
971                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
972                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
973
974                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
975                         if (!rc) {
976                                 void *data = NULL;
977                                 mdd_data_get(env, mdd_obj, &data);
978                                 quota_opc = FSFILT_OP_LINK;
979                                 mdd_quota_wrapper(la_tmp, qids);
980                                 /* get block quota for parent */
981                                 lquota_chkquota(mds_quota_interface_ref, obd,
982                                                 qids[USRQUOTA], qids[GRPQUOTA],
983                                                 1, &rec_pending, NULL,
984                                                 LQUOTA_FLAGS_BLK, data, 1);
985                         }
986                 } else {
987                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
988                 }
989         }
990 #endif
991         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
992         handle = mdd_trans_start(env, mdo2mdd(pobj));
993         if (IS_ERR(handle))
994                 GOTO(out_pending, rc = PTR_ERR(handle));
995
996         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
997         if (dlh == NULL)
998                 GOTO(out_trans, rc = -ENOMEM);
999
1000         rc = mdd_ni_sanity_check(env, pobj, ma);
1001         if (rc)
1002                 GOTO(out_unlock, rc);
1003
1004         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
1005                                 handle, BYPASS_CAPA);
1006         if (rc)
1007                 GOTO(out_unlock, rc);
1008
1009         /*
1010          * For some case, no need update obj's ctime (LA_CTIME is not set),
1011          * e.g. split_dir.
1012          * For other cases, update obj's ctime (LA_CTIME is set),
1013          * e.g. cmr_link.
1014          */
1015         if (ma->ma_attr.la_valid & LA_CTIME) {
1016                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1017                 la->la_valid = LA_CTIME | LA_MTIME;
1018                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1019                                                         handle, 0);
1020         }
1021         EXIT;
1022 out_unlock:
1023         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1024 out_trans:
1025         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
1026 out_pending:
1027 #ifdef HAVE_QUOTA_SUPPORT
1028         if (mds->mds_quota) {
1029                 if (quota_opc) {
1030                         if (rec_pending)
1031                                 lquota_pending_commit(mds_quota_interface_ref,
1032                                                       obd, qids[USRQUOTA],
1033                                                       qids[GRPQUOTA],
1034                                                       rec_pending, 1);
1035                         /* Trigger dqacq for the parent owner. If failed,
1036                          * the next call for lquota_chkquota will process it*/
1037                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
1038                                       rc, quota_opc);
1039                 } else {
1040                         uc->mu_cap = save;
1041                 }
1042         }
1043 #endif
1044         return rc;
1045 }
1046
1047 /* has not lock on pobj yet */
1048 static int mdd_nr_sanity_check(const struct lu_env *env,
1049                                struct md_object *pobj,
1050                                const struct md_attr *ma)
1051 {
1052         struct mdd_object *obj = md2mdd_obj(pobj);
1053         int rc;
1054         ENTRY;
1055
1056         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
1057                 RETURN(0);
1058
1059         rc = mdd_may_unlink(env, obj, ma);
1060
1061         RETURN(rc);
1062 }
1063
1064 /*
1065  * Partial operation.
1066  */
1067 static int mdd_name_remove(const struct lu_env *env,
1068                            struct md_object *pobj,
1069                            const struct lu_name *lname,
1070                            const struct md_attr *ma)
1071 {
1072         const char *name = lname->ln_name;
1073         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1074         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1075         struct mdd_device *mdd = mdo2mdd(pobj);
1076         struct dynlock_handle *dlh;
1077         struct thandle *handle;
1078         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1079 #ifdef HAVE_QUOTA_SUPPORT
1080         struct obd_device *obd = mdd->mdd_obd_dev;
1081         struct mds_obd *mds = &obd->u.mds;
1082         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1083         int quota_opc = 0;
1084 #endif
1085         int rc;
1086         ENTRY;
1087
1088 #ifdef HAVE_QUOTA_SUPPORT
1089         if (mds->mds_quota) {
1090                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1091
1092                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1093                 if (!rc) {
1094                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
1095                         mdd_quota_wrapper(la_tmp, qids);
1096                 }
1097         }
1098 #endif
1099         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
1100         handle = mdd_trans_start(env, mdd);
1101         if (IS_ERR(handle))
1102                 GOTO(out_pending, rc = PTR_ERR(handle));
1103
1104         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1105         if (dlh == NULL)
1106                 GOTO(out_trans, rc = -ENOMEM);
1107
1108         rc = mdd_nr_sanity_check(env, pobj, ma);
1109         if (rc)
1110                 GOTO(out_unlock, rc);
1111
1112         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1113                                 handle, BYPASS_CAPA);
1114         if (rc)
1115                 GOTO(out_unlock, rc);
1116
1117         /*
1118          * For some case, no need update obj's ctime (LA_CTIME is not set),
1119          * e.g. split_dir.
1120          * For other cases, update obj's ctime (LA_CTIME is set),
1121          * e.g. cmr_unlink.
1122          */
1123         if (ma->ma_attr.la_valid & LA_CTIME) {
1124                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1125                 la->la_valid = LA_CTIME | LA_MTIME;
1126                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1127                                                         handle, 0);
1128         }
1129         EXIT;
1130 out_unlock:
1131         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1132 out_trans:
1133         mdd_trans_stop(env, mdd, rc, handle);
1134 out_pending:
1135 #ifdef HAVE_QUOTA_SUPPORT
1136         /* Trigger dqrel for the parent owner.
1137          * If failed, the next call for lquota_chkquota will process it. */
1138         if (quota_opc)
1139                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1140                               quota_opc);
1141 #endif
1142         return rc;
1143 }
1144
1145 /*
1146  * tobj maybe NULL
1147  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1148  */
1149 static int mdd_rt_sanity_check(const struct lu_env *env,
1150                                struct mdd_object *tgt_pobj,
1151                                struct mdd_object *tobj,
1152                                struct md_attr *ma)
1153 {
1154         int rc;
1155         ENTRY;
1156
1157         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1158                 RETURN(0);
1159
1160         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1161          * exist. In fact, tobj must exist, otherwise the call trace will be:
1162          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1163          * When get here, tobj must be NOT NULL, the other case has been
1164          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1165          * MDS_PERM_BYPASS.
1166          * So check may_delete, but not check nlink of tgt_pobj. */
1167
1168         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1169
1170         RETURN(rc);
1171 }
1172
1173 /* Partial rename op on slave MDD */
1174 static int mdd_rename_tgt(const struct lu_env *env,
1175                           struct md_object *pobj, struct md_object *tobj,
1176                           const struct lu_fid *lf, const struct lu_name *lname,
1177                           struct md_attr *ma)
1178 {
1179         const char *name = lname->ln_name;
1180         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1181         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1182         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1183         struct mdd_device *mdd = mdo2mdd(pobj);
1184         struct dynlock_handle *dlh;
1185         struct thandle *handle;
1186 #ifdef HAVE_QUOTA_SUPPORT
1187         struct obd_device *obd = mdd->mdd_obd_dev;
1188         struct mds_obd *mds = &obd->u.mds;
1189         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1190         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1191         int quota_opc = 0, rec_pending = 0;
1192 #endif
1193         int rc;
1194         ENTRY;
1195
1196 #ifdef HAVE_QUOTA_SUPPORT
1197         if (mds->mds_quota && !tobj) {
1198                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1199
1200                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1201                 if (!rc) {
1202                         void *data = NULL;
1203                         mdd_data_get(env, mdd_tpobj, &data);
1204                         quota_opc = FSFILT_OP_LINK;
1205                         mdd_quota_wrapper(la_tmp, qpids);
1206                         /* get block quota for target parent */
1207                         lquota_chkquota(mds_quota_interface_ref, obd,
1208                                         qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1209                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK,
1210                                         data, 1);
1211                 }
1212         }
1213 #endif
1214         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
1215         handle = mdd_trans_start(env, mdd);
1216         if (IS_ERR(handle))
1217                 GOTO(out_pending, rc = PTR_ERR(handle));
1218
1219         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1220         if (dlh == NULL)
1221                 GOTO(out_trans, rc = -ENOMEM);
1222         if (tobj)
1223                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1224
1225         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1226         if (rc)
1227                 GOTO(cleanup, rc);
1228
1229         /*
1230          * If rename_tgt is called then we should just re-insert name with
1231          * correct fid, no need to dec/inc parent nlink if obj is dir.
1232          */
1233         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1234         if (rc)
1235                 GOTO(cleanup, rc);
1236
1237         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1238                                      BYPASS_CAPA);
1239         if (rc)
1240                 GOTO(cleanup, rc);
1241
1242         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1243         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1244
1245         la->la_valid = LA_CTIME | LA_MTIME;
1246         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1247         if (rc)
1248                 GOTO(cleanup, rc);
1249
1250         /*
1251          * For tobj is remote case cmm layer has processed
1252          * and pass NULL tobj to here. So when tobj is NOT NULL,
1253          * it must be local one.
1254          */
1255         if (tobj && mdd_object_exists(mdd_tobj)) {
1256                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1257
1258                 /* Remove dot reference. */
1259                 if (S_ISDIR(ma->ma_attr.la_mode))
1260                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1261
1262                 la->la_valid = LA_CTIME;
1263                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1264                 if (rc)
1265                         GOTO(cleanup, rc);
1266
1267                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1268                 if (rc)
1269                         GOTO(cleanup, rc);
1270
1271 #ifdef HAVE_QUOTA_SUPPORT
1272                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1273                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1274                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1275                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1276                 }
1277 #endif
1278         }
1279         EXIT;
1280 cleanup:
1281         if (tobj)
1282                 mdd_write_unlock(env, mdd_tobj);
1283         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1284 out_trans:
1285         if (rc == 0)
1286                 /* Bare EXT record with no RENAME in front of it signifies
1287                    a partial slave op */
1288                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
1289                                             mdd_tpobj, NULL, lname, handle);
1290
1291         mdd_trans_stop(env, mdd, rc, handle);
1292 out_pending:
1293 #ifdef HAVE_QUOTA_SUPPORT
1294         if (mds->mds_quota) {
1295                 if (rec_pending)
1296                         lquota_pending_commit(mds_quota_interface_ref, obd,
1297                                               qpids[USRQUOTA],
1298                                               qpids[GRPQUOTA],
1299                                               rec_pending, 1);
1300                 if (quota_opc)
1301                         /* Trigger dqrel/dqacq on the target owner of child and
1302                          * parent. If failed, the next call for lquota_chkquota
1303                          * will process it. */
1304                         lquota_adjust(mds_quota_interface_ref, obd, qcids,
1305                                       qpids, rc, quota_opc);
1306         }
1307 #endif
1308         return rc;
1309 }
1310
1311 /*
1312  * The permission has been checked when obj created, no need check again.
1313  */
1314 static int mdd_cd_sanity_check(const struct lu_env *env,
1315                                struct mdd_object *obj)
1316 {
1317         ENTRY;
1318
1319         /* EEXIST check */
1320         if (!obj || mdd_is_dead_obj(obj))
1321                 RETURN(-ENOENT);
1322
1323         RETURN(0);
1324
1325 }
1326
1327 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1328                            struct md_object *cobj, const struct md_op_spec *spec,
1329                            struct md_attr *ma)
1330 {
1331         struct mdd_device *mdd = mdo2mdd(cobj);
1332         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1333         struct mdd_object *son = md2mdd_obj(cobj);
1334         struct lu_attr    *attr = &ma->ma_attr;
1335         struct lov_mds_md *lmm = NULL;
1336         int                lmm_size = 0;
1337         struct thandle    *handle;
1338         int                rc;
1339         ENTRY;
1340
1341         rc = mdd_cd_sanity_check(env, son);
1342         if (rc)
1343                 RETURN(rc);
1344
1345         if (!md_should_create(spec->sp_cr_flags))
1346                 RETURN(0);
1347
1348         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1349                             spec, attr);
1350         if (rc)
1351                 RETURN(rc);
1352
1353         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1354         handle = mdd_trans_start(env, mdd);
1355         if (IS_ERR(handle))
1356                 GOTO(out_free, rc = PTR_ERR(handle));
1357
1358         /*
1359          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1360          * Should this be fixed?
1361          */
1362
1363         /* Replay creates has objects already */
1364 #if 0
1365         if (spec->no_create) {
1366                 CDEBUG(D_INFO, "we already have lov ea\n");
1367                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1368                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1369                                     spec->u.sp_ea.eadatalen, handle, 0);
1370         } else
1371 #endif
1372                 /* No need mdd_lsm_sanity_check here */
1373                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1374                                     lmm_size, handle, 0);
1375
1376         if (rc == 0)
1377                rc = mdd_attr_get_internal_locked(env, son, ma);
1378
1379         /* update lov_objid data, must be before transaction stop! */
1380         if (rc == 0)
1381                 mdd_lov_objid_update(mdd, lmm);
1382
1383         mdd_trans_stop(env, mdd, rc, handle);
1384 out_free:
1385         /* Finish mdd_lov_create() stuff. */
1386         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1387         RETURN(rc);
1388 }
1389
1390 /* Get fid from name and parent */
1391 static int
1392 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1393              const struct lu_name *lname, struct lu_fid* fid, int mask)
1394 {
1395         const char          *name = lname->ln_name;
1396         const struct dt_key *key = (const struct dt_key *)name;
1397         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1398         struct mdd_device   *m = mdo2mdd(pobj);
1399         struct dt_object    *dir = mdd_object_child(mdd_obj);
1400         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1401         int rc;
1402         ENTRY;
1403
1404         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1405                 RETURN(-ESTALE);
1406
1407         rc = mdd_object_exists(mdd_obj);
1408         if (unlikely(rc == 0))
1409                 RETURN(-ESTALE);
1410         else if (unlikely(rc < 0)) {
1411                 CERROR("Object "DFID" locates on remote server\n",
1412                         PFID(mdo2fid(mdd_obj)));
1413                 LBUG();
1414         }
1415
1416         /* The common filename length check. */
1417         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1418                 RETURN(-ENAMETOOLONG);
1419
1420         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1421                                             MOR_TGT_PARENT);
1422         if (rc)
1423                 RETURN(rc);
1424
1425         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1426                    dt_try_as_dir(env, dir))) {
1427
1428                 rc = dir->do_index_ops->dio_lookup(env, dir,
1429                                                  (struct dt_rec *)pack, key,
1430                                                  mdd_object_capa(env, mdd_obj));
1431                 if (rc > 0)
1432                         rc = fid_unpack(pack, fid);
1433                 else if (rc == 0)
1434                         rc = -ENOENT;
1435         } else
1436                 rc = -ENOTDIR;
1437
1438         RETURN(rc);
1439 }
1440
1441 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1442                           const struct lu_name *lname, struct mdd_object *child,
1443                           struct md_attr *ma, struct thandle *handle,
1444                           const struct md_op_spec *spec)
1445 {
1446         int rc;
1447         ENTRY;
1448
1449         /*
1450          * Update attributes for child.
1451          *
1452          * FIXME:
1453          *  (1) the valid bits should be converted between Lustre and Linux;
1454          *  (2) maybe, the child attributes should be set in OSD when creation.
1455          */
1456
1457         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1458         if (rc != 0)
1459                 RETURN(rc);
1460
1461         if (S_ISDIR(ma->ma_attr.la_mode)) {
1462                 /* Add "." and ".." for newly created dir */
1463                 __mdd_ref_add(env, child, handle);
1464                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1465                                              dot, handle, BYPASS_CAPA);
1466                 if (rc == 0) {
1467                         rc = __mdd_index_insert_only(env, child, pfid,
1468                                                      dotdot, handle,
1469                                                      BYPASS_CAPA);
1470                         if (rc != 0) {
1471                                 int rc2;
1472
1473                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1474                                                          handle, BYPASS_CAPA);
1475                                 if (rc2 != 0)
1476                                         CERROR("Failure to cleanup after dotdot"
1477                                                " creation: %d (%d)\n", rc2, rc);
1478                         }
1479                 }
1480         }
1481         if (rc == 0)
1482                 mdd_links_add(env, child, pfid, lname, handle);
1483
1484         RETURN(rc);
1485 }
1486
1487 /* has not lock on pobj yet */
1488 static int mdd_create_sanity_check(const struct lu_env *env,
1489                                    struct md_object *pobj,
1490                                    const struct lu_name *lname,
1491                                    struct md_attr *ma,
1492                                    struct md_op_spec *spec)
1493 {
1494         struct mdd_thread_info *info = mdd_env_info(env);
1495         struct lu_attr    *la        = &info->mti_la;
1496         struct lu_fid     *fid       = &info->mti_fid;
1497         struct mdd_object *obj       = md2mdd_obj(pobj);
1498         struct mdd_device *m         = mdo2mdd(pobj);
1499         int lookup                   = spec->sp_cr_lookup;
1500         int rc;
1501         ENTRY;
1502
1503         /* EEXIST check */
1504         if (mdd_is_dead_obj(obj))
1505                 RETURN(-ENOENT);
1506
1507         /*
1508          * In some cases this lookup is not needed - we know before if name
1509          * exists or not because MDT performs lookup for it.
1510          * name length check is done in lookup.
1511          */
1512         if (lookup) {
1513                 /*
1514                  * Check if the name already exist, though it will be checked in
1515                  * _index_insert also, for avoiding rolling back if exists
1516                  * _index_insert.
1517                  */
1518                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1519                                          MAY_WRITE | MAY_EXEC);
1520                 if (rc != -ENOENT)
1521                         RETURN(rc ? : -EEXIST);
1522         } else {
1523                 /*
1524                  * Check WRITE permission for the parent.
1525                  * EXEC permission have been checked
1526                  * when lookup before create already.
1527                  */
1528                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1529                                                     MOR_TGT_PARENT);
1530                 if (rc)
1531                         RETURN(rc);
1532         }
1533
1534         /* sgid check */
1535         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1536         if (rc != 0)
1537                 RETURN(rc);
1538
1539         if (la->la_mode & S_ISGID) {
1540                 ma->ma_attr.la_gid = la->la_gid;
1541                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1542                         ma->ma_attr.la_mode |= S_ISGID;
1543                         ma->ma_attr.la_valid |= LA_MODE;
1544                 }
1545         }
1546
1547         switch (ma->ma_attr.la_mode & S_IFMT) {
1548         case S_IFLNK: {
1549                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1550
1551                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1552                         RETURN(-ENAMETOOLONG);
1553                 else
1554                         RETURN(0);
1555         }
1556         case S_IFDIR:
1557         case S_IFREG:
1558         case S_IFCHR:
1559         case S_IFBLK:
1560         case S_IFIFO:
1561         case S_IFSOCK:
1562                 rc = 0;
1563                 break;
1564         default:
1565                 rc = -EINVAL;
1566                 break;
1567         }
1568         RETURN(rc);
1569 }
1570
1571 /*
1572  * Create object and insert it into namespace.
1573  */
1574 static int mdd_create(const struct lu_env *env,
1575                       struct md_object *pobj,
1576                       const struct lu_name *lname,
1577                       struct md_object *child,
1578                       struct md_op_spec *spec,
1579                       struct md_attr* ma)
1580 {
1581         struct mdd_thread_info *info = mdd_env_info(env);
1582         struct lu_attr         *la = &info->mti_la_for_fix;
1583         struct md_attr         *ma_acl = &info->mti_ma;
1584         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1585         struct mdd_object      *son = md2mdd_obj(child);
1586         struct mdd_device      *mdd = mdo2mdd(pobj);
1587         struct lu_attr         *attr = &ma->ma_attr;
1588         struct lov_mds_md      *lmm = NULL;
1589         struct thandle         *handle;
1590         struct dynlock_handle  *dlh;
1591         const char             *name = lname->ln_name;
1592         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1593         int got_def_acl = 0;
1594 #ifdef HAVE_QUOTA_SUPPORT
1595         struct obd_device *obd = mdd->mdd_obd_dev;
1596         struct mds_obd *mds = &obd->u.mds;
1597         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1598         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1599         int quota_opc = 0, block_count = 0;
1600         int inode_pending = 0, block_pending = 0, parent_pending = 0;
1601 #endif
1602         ENTRY;
1603
1604         /*
1605          * Two operations have to be performed:
1606          *
1607          *  - an allocation of a new object (->do_create()), and
1608          *
1609          *  - an insertion into a parent index (->dio_insert()).
1610          *
1611          * Due to locking, operation order is not important, when both are
1612          * successful, *but* error handling cases are quite different:
1613          *
1614          *  - if insertion is done first, and following object creation fails,
1615          *  insertion has to be rolled back, but this operation might fail
1616          *  also leaving us with dangling index entry.
1617          *
1618          *  - if creation is done first, is has to be undone if insertion
1619          *  fails, leaving us with leaked space, which is neither good, nor
1620          *  fatal.
1621          *
1622          * It seems that creation-first is simplest solution, but it is
1623          * sub-optimal in the frequent
1624          *
1625          *         $ mkdir foo
1626          *         $ mkdir foo
1627          *
1628          * case, because second mkdir is bound to create object, only to
1629          * destroy it immediately.
1630          *
1631          * To avoid this follow local file systems that do double lookup:
1632          *
1633          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1634          *
1635          *     1. create            (mdd_object_create_internal())
1636          *
1637          *     2. insert            (__mdd_index_insert(), lookup again)
1638          */
1639
1640         /* Sanity checks before big job. */
1641         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1642         if (rc)
1643                 RETURN(rc);
1644
1645 #ifdef HAVE_QUOTA_SUPPORT
1646         if (mds->mds_quota) {
1647                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1648
1649                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1650                 if (!rc) {
1651                         int same = 0;
1652
1653                         quota_opc = FSFILT_OP_CREATE;
1654                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1655                         mdd_quota_wrapper(la_tmp, qpids);
1656                         /* get file quota for child */
1657                         lquota_chkquota(mds_quota_interface_ref, obd,
1658                                         qcids[USRQUOTA], qcids[GRPQUOTA], 1,
1659                                         &inode_pending, NULL, 0, NULL, 0);
1660                         switch (ma->ma_attr.la_mode & S_IFMT) {
1661                         case S_IFLNK:
1662                         case S_IFDIR:
1663                                 block_count = 2;
1664                                 break;
1665                         case S_IFREG:
1666                                 block_count = 1;
1667                                 break;
1668                         }
1669                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1670                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1671                                 block_count += 1;
1672                                 same = 1;
1673                         }
1674                         /* get block quota for child and parent */
1675                         if (block_count)
1676                                 lquota_chkquota(mds_quota_interface_ref, obd,
1677                                                 qcids[USRQUOTA], qcids[GRPQUOTA],
1678                                                 block_count,
1679                                                 &block_pending, NULL,
1680                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1681                         if (!same)
1682                                 lquota_chkquota(mds_quota_interface_ref, obd,
1683                                                 qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1684                                                 &parent_pending, NULL,
1685                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1686                 }
1687         }
1688 #endif
1689
1690         /*
1691          * No RPC inside the transaction, so OST objects should be created at
1692          * first.
1693          */
1694         if (S_ISREG(attr->la_mode)) {
1695                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1696                                     spec, attr);
1697                 if (rc)
1698                         GOTO(out_pending, rc);
1699         }
1700
1701         if (!S_ISLNK(attr->la_mode)) {
1702                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1703                 ma_acl->ma_acl = info->mti_xattr_buf;
1704                 ma_acl->ma_need = MA_ACL_DEF;
1705                 ma_acl->ma_valid = 0;
1706
1707                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1708                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1709                 mdd_read_unlock(env, mdd_pobj);
1710                 if (rc)
1711                         GOTO(out_free, rc);
1712                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1713                         got_def_acl = 1;
1714         }
1715
1716         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1717         handle = mdd_trans_start(env, mdd);
1718         if (IS_ERR(handle))
1719                 GOTO(out_free, rc = PTR_ERR(handle));
1720
1721         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1722         if (dlh == NULL)
1723                 GOTO(out_trans, rc = -ENOMEM);
1724
1725         mdd_write_lock(env, son, MOR_TGT_CHILD);
1726         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
1727         if (rc) {
1728                 mdd_write_unlock(env, son);
1729                 GOTO(cleanup, rc);
1730         }
1731
1732         created = 1;
1733
1734 #ifdef CONFIG_FS_POSIX_ACL
1735         if (got_def_acl) {
1736                 struct lu_buf *acl_buf = &info->mti_buf;
1737                 acl_buf->lb_buf = ma_acl->ma_acl;
1738                 acl_buf->lb_len = ma_acl->ma_acl_size;
1739
1740                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1741                 if (rc) {
1742                         mdd_write_unlock(env, son);
1743                         GOTO(cleanup, rc);
1744                 } else {
1745                         ma->ma_attr.la_valid |= LA_MODE;
1746                 }
1747         }
1748 #endif
1749
1750         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
1751                                    son, ma, handle, spec);
1752         mdd_write_unlock(env, son);
1753         if (rc)
1754                 /*
1755                  * Object has no links, so it will be destroyed when last
1756                  * reference is released. (XXX not now.)
1757                  */
1758                 GOTO(cleanup, rc);
1759
1760         initialized = 1;
1761
1762         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1763                                 name, S_ISDIR(attr->la_mode), handle,
1764                                 mdd_object_capa(env, mdd_pobj));
1765
1766         if (rc)
1767                 GOTO(cleanup, rc);
1768
1769         inserted = 1;
1770
1771         /* No need mdd_lsm_sanity_check here */
1772         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1773         if (rc) {
1774                 CERROR("error on stripe info copy %d \n", rc);
1775                 GOTO(cleanup, rc);
1776         }
1777         if (lmm && lmm_size > 0) {
1778                 /* Set Lov here, do not get lmm again later */
1779                 memcpy(ma->ma_lmm, lmm, lmm_size);
1780                 ma->ma_lmm_size = lmm_size;
1781                 ma->ma_valid |= MA_LOV;
1782         }
1783
1784         if (S_ISLNK(attr->la_mode)) {
1785                 struct md_ucred  *uc = md_ucred(env);
1786                 struct dt_object *dt = mdd_object_child(son);
1787                 const char *target_name = spec->u.sp_symname;
1788                 int sym_len = strlen(target_name);
1789                 const struct lu_buf *buf;
1790                 loff_t pos = 0;
1791
1792                 buf = mdd_buf_get_const(env, target_name, sym_len);
1793                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1794                                                 mdd_object_capa(env, son),
1795                                                 uc->mu_cap &
1796                                                 CFS_CAP_SYS_RESOURCE_MASK);
1797
1798                 if (rc == sym_len)
1799                         rc = 0;
1800                 else
1801                         GOTO(cleanup, rc = -EFAULT);
1802         }
1803
1804         *la = ma->ma_attr;
1805         la->la_valid = LA_CTIME | LA_MTIME;
1806         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1807         if (rc)
1808                 GOTO(cleanup, rc);
1809
1810         /* Return attr back. */
1811         rc = mdd_attr_get_internal_locked(env, son, ma);
1812         EXIT;
1813 cleanup:
1814         if (rc && created) {
1815                 int rc2 = 0;
1816
1817                 if (inserted) {
1818                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1819                                                  S_ISDIR(attr->la_mode),
1820                                                  handle, BYPASS_CAPA);
1821                         if (rc2)
1822                                 CERROR("error can not cleanup destroy %d\n",
1823                                        rc2);
1824                 }
1825
1826                 if (rc2 == 0) {
1827                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1828                         __mdd_ref_del(env, son, handle, 0);
1829                         if (initialized && S_ISDIR(attr->la_mode))
1830                                 __mdd_ref_del(env, son, handle, 1);
1831                         mdd_write_unlock(env, son);
1832                 }
1833         }
1834
1835         /* update lov_objid data, must be before transaction stop! */
1836         if (rc == 0)
1837                 mdd_lov_objid_update(mdd, lmm);
1838
1839         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1840 out_trans:
1841         if (rc == 0)
1842                 rc = mdd_changelog_ns_store(env, mdd,
1843                             S_ISDIR(attr->la_mode) ? CL_MKDIR :
1844                             S_ISREG(attr->la_mode) ? CL_CREATE :
1845                             S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
1846                             son, mdd_pobj, NULL, lname, handle);
1847         mdd_trans_stop(env, mdd, rc, handle);
1848 out_free:
1849         /* finis lov_create stuff, free all temporary data */
1850         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1851 out_pending:
1852 #ifdef HAVE_QUOTA_SUPPORT
1853         if (quota_opc) {
1854                 if (inode_pending)
1855                         lquota_pending_commit(mds_quota_interface_ref, obd,
1856                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1857                                               inode_pending, 0);
1858                 if (block_pending)
1859                         lquota_pending_commit(mds_quota_interface_ref, obd,
1860                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1861                                               block_pending, 1);
1862                 if (parent_pending)
1863                         lquota_pending_commit(mds_quota_interface_ref, obd,
1864                                               qpids[USRQUOTA], qpids[GRPQUOTA],
1865                                               parent_pending, 1);
1866                 /* Trigger dqacq on the owner of child and parent. If failed,
1867                  * the next call for lquota_chkquota will process it. */
1868                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1869                               quota_opc);
1870         }
1871 #endif
1872         return rc;
1873 }
1874
1875 /*
1876  * Get locks on parents in proper order
1877  * RETURN: < 0 - error, rename_order if successful
1878  */
1879 enum rename_order {
1880         MDD_RN_SAME,
1881         MDD_RN_SRCTGT,
1882         MDD_RN_TGTSRC
1883 };
1884
1885 static int mdd_rename_order(const struct lu_env *env,
1886                             struct mdd_device *mdd,
1887                             struct mdd_object *src_pobj,
1888                             struct mdd_object *tgt_pobj)
1889 {
1890         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1891         int rc;
1892         ENTRY;
1893
1894         if (src_pobj == tgt_pobj)
1895                 RETURN(MDD_RN_SAME);
1896
1897         /* compared the parent child relationship of src_p&tgt_p */
1898         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1899                 rc = MDD_RN_SRCTGT;
1900         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1901                 rc = MDD_RN_TGTSRC;
1902         } else {
1903                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1904                 if (rc == -EREMOTE)
1905                         rc = 0;
1906
1907                 if (rc == 1)
1908                         rc = MDD_RN_TGTSRC;
1909                 else if (rc == 0)
1910                         rc = MDD_RN_SRCTGT;
1911         }
1912
1913         RETURN(rc);
1914 }
1915
1916 /* has not mdd_write{read}_lock on any obj yet. */
1917 static int mdd_rename_sanity_check(const struct lu_env *env,
1918                                    struct mdd_object *src_pobj,
1919                                    struct mdd_object *tgt_pobj,
1920                                    struct mdd_object *sobj,
1921                                    struct mdd_object *tobj,
1922                                    struct md_attr *ma)
1923 {
1924         int rc = 0;
1925         ENTRY;
1926
1927         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1928                 RETURN(0);
1929
1930         /* XXX: when get here, sobj must NOT be NULL,
1931          * the other case has been processed in cml_rename
1932          * before mdd_rename and enable MDS_PERM_BYPASS. */
1933         LASSERT(sobj);
1934
1935         if (mdd_is_dead_obj(sobj))
1936                 RETURN(-ESTALE);
1937
1938         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         /* XXX: when get here, "tobj == NULL" means tobj must
1943          * NOT exist (neither on remote MDS, such case has been
1944          * processed in cml_rename before mdd_rename and enable
1945          * MDS_PERM_BYPASS).
1946          * So check may_create, but not check may_unlink. */
1947         if (!tobj)
1948                 rc = mdd_may_create(env, tgt_pobj, NULL,
1949                                     (src_pobj != tgt_pobj), 0);
1950         else
1951                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1952                                     (src_pobj != tgt_pobj), 1);
1953
1954         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1955             S_ISDIR(ma->ma_attr.la_mode))
1956                 rc = __mdd_may_link(env, tgt_pobj);
1957
1958         RETURN(rc);
1959 }
1960
1961 /* src object can be remote that is why we use only fid and type of object */
1962 static int mdd_rename(const struct lu_env *env,
1963                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1964                       const struct lu_fid *lf, const struct lu_name *lsname,
1965                       struct md_object *tobj, const struct lu_name *ltname,
1966                       struct md_attr *ma)
1967 {
1968         const char *sname = lsname->ln_name;
1969         const char *tname = ltname->ln_name;
1970         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1971         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
1972         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1973         struct mdd_device *mdd = mdo2mdd(src_pobj);
1974         struct mdd_object *mdd_sobj = NULL;                  /* source object */
1975         struct mdd_object *mdd_tobj = NULL;
1976         struct dynlock_handle *sdlh, *tdlh;
1977         struct thandle *handle;
1978         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
1979         int is_dir;
1980         int rc;
1981
1982 #ifdef HAVE_QUOTA_SUPPORT
1983         struct obd_device *obd = mdd->mdd_obd_dev;
1984         struct mds_obd *mds = &obd->u.mds;
1985         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
1986         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
1987         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
1988         int quota_opc = 0, rec_pending = 0;
1989 #endif
1990         ENTRY;
1991
1992         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1993         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1994
1995         if (tobj)
1996                 mdd_tobj = md2mdd_obj(tobj);
1997
1998 #ifdef HAVE_QUOTA_SUPPORT
1999         if (mds->mds_quota) {
2000                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
2001
2002                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
2003                 if (!rc) {
2004                         mdd_quota_wrapper(la_tmp, qspids);
2005                         if (!tobj) {
2006                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
2007                                                 BYPASS_CAPA);
2008                                 if (!rc) {
2009                                         void *data = NULL;
2010                                         mdd_data_get(env, mdd_tpobj, &data);
2011                                         quota_opc = FSFILT_OP_LINK;
2012                                         mdd_quota_wrapper(la_tmp, qtpids);
2013                                         /* get block quota for target parent */
2014                                         lquota_chkquota(mds_quota_interface_ref,
2015                                                         obd, qtpids[USRQUOTA],
2016                                                         qtpids[GRPQUOTA], 1,
2017                                                         &rec_pending, NULL,
2018                                                         LQUOTA_FLAGS_BLK,
2019                                                         data, 1);
2020                                 }
2021                         }
2022                 }
2023         }
2024 #endif
2025         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
2026         handle = mdd_trans_start(env, mdd);
2027         if (IS_ERR(handle))
2028                 GOTO(out_pending, rc = PTR_ERR(handle));
2029
2030         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2031         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2032         if (rc < 0)
2033                 GOTO(cleanup_unlocked, rc);
2034
2035         /* Get locks in determined order */
2036         if (rc == MDD_RN_SAME) {
2037                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2038                                           sname, MOR_SRC_PARENT);
2039                 /* check hashes to determine do we need one lock or two */
2040                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2041                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2042                                 MOR_TGT_PARENT);
2043                 else
2044                         tdlh = sdlh;
2045         } else if (rc == MDD_RN_SRCTGT) {
2046                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2047                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2048         } else {
2049                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2050                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2051         }
2052         if (sdlh == NULL || tdlh == NULL)
2053                 GOTO(cleanup, rc = -ENOMEM);
2054
2055         mdd_sobj = mdd_object_find(env, mdd, lf);
2056         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2057                                      mdd_sobj, mdd_tobj, ma);
2058         if (rc)
2059                 GOTO(cleanup, rc);
2060
2061         /* Remove source name from source directory */
2062         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2063                                 mdd_object_capa(env, mdd_spobj));
2064         if (rc)
2065                 GOTO(cleanup, rc);
2066
2067         /* "mv dir1 dir2" needs "dir1/.." link update */
2068         if (is_dir && mdd_sobj) {
2069                 rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
2070                                         mdd_object_capa(env, mdd_spobj));
2071                 if (rc)
2072                        GOTO(cleanup, rc);
2073
2074                 rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
2075                                         is_dir, handle,
2076                                         mdd_object_capa(env, mdd_tpobj));
2077                 if (rc)
2078                         GOTO(cleanup, rc);
2079         }
2080
2081         /* Remove target name from target directory
2082          * Here tobj can be remote one, so we do index_delete unconditionally
2083          * and -ENOENT is allowed.
2084          */
2085         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2086                                 mdd_object_capa(env, mdd_tpobj));
2087         if (rc != 0 && rc != -ENOENT)
2088                 GOTO(cleanup, rc);
2089
2090         /* Insert new fid with target name into target dir */
2091         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2092                                 mdd_object_capa(env, mdd_tpobj));
2093         if (rc)
2094                 GOTO(cleanup, rc);
2095
2096         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2097         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2098
2099         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2100         if (mdd_sobj) {
2101                 la->la_valid = LA_CTIME;
2102                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
2103                                                         handle, 0);
2104                 if (rc)
2105                         GOTO(cleanup, rc);
2106         }
2107
2108         /* Remove old target object
2109          * For tobj is remote case cmm layer has processed
2110          * and set tobj to NULL then. So when tobj is NOT NULL,
2111          * it must be local one.
2112          */
2113         if (tobj && mdd_object_exists(mdd_tobj)) {
2114                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2115                 if (mdd_is_dead_obj(mdd_tobj)) {
2116                         mdd_write_unlock(env, mdd_tobj);
2117                         /* shld not be dead, something is wrong */
2118                         CERROR("tobj is dead\n");
2119                         rc = -EINVAL;
2120                         goto cleanup;
2121                 }
2122                 __mdd_ref_del(env, mdd_tobj, handle, 0);
2123
2124                 /* Remove dot reference. */
2125                 if (is_dir)
2126                         __mdd_ref_del(env, mdd_tobj, handle, 1);
2127
2128                 la->la_valid = LA_CTIME;
2129                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2130                 if (rc)
2131                         GOTO(cleanup, rc);
2132
2133                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2134                 mdd_write_unlock(env, mdd_tobj);
2135                 if (rc)
2136                         GOTO(cleanup, rc);
2137
2138 #ifdef HAVE_QUOTA_SUPPORT
2139                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2140                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
2141                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2142                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
2143                 }
2144 #endif
2145         }
2146
2147         la->la_valid = LA_CTIME | LA_MTIME;
2148         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
2149         if (rc)
2150                 GOTO(cleanup, rc);
2151
2152         if (mdd_spobj != mdd_tpobj) {
2153                 la->la_valid = LA_CTIME | LA_MTIME;
2154                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
2155                                                   handle, 0);
2156         }
2157
2158         if (rc == 0 && mdd_sobj) {
2159                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2160                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2161                                       mdo2fid(mdd_tpobj), ltname, handle);
2162                 if (rc == -ENOENT)
2163                         /* Old files might not have EA entry */
2164                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2165                                       lsname, handle);
2166                 mdd_write_unlock(env, mdd_sobj);
2167                 /* We don't fail the transaction if the link ea can't be
2168                    updated -- fid2path will use alternate lookup method. */
2169                 rc = 0;
2170         }
2171
2172         EXIT;
2173 cleanup:
2174         if (likely(tdlh) && sdlh != tdlh)
2175                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2176         if (likely(sdlh))
2177                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2178 cleanup_unlocked:
2179         if (rc == 0)
2180                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj,
2181                                             mdd_spobj, lf, lsname, handle);
2182         if (rc == 0)
2183                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
2184                                             mdd_tpobj, lf, ltname, handle);
2185
2186         mdd_trans_stop(env, mdd, rc, handle);
2187         if (mdd_sobj)
2188                 mdd_object_put(env, mdd_sobj);
2189 out_pending:
2190 #ifdef HAVE_QUOTA_SUPPORT
2191         if (mds->mds_quota) {
2192                 if (rec_pending)
2193                         lquota_pending_commit(mds_quota_interface_ref, obd,
2194                                               qtpids[USRQUOTA],
2195                                               qtpids[GRPQUOTA],
2196                                               rec_pending, 1);
2197                 /* Trigger dqrel on the source owner of parent.
2198                  * If failed, the next call for lquota_chkquota will
2199                  * process it. */
2200                 lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2201                               FSFILT_OP_UNLINK_PARTIAL_PARENT);
2202                 if (quota_opc)
2203                         /* Trigger dqrel/dqacq on the target owner of child and
2204                          * parent. If failed, the next call for lquota_chkquota
2205                          * will process it. */
2206                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2207                                       qtpids, rc, quota_opc);
2208         }
2209 #endif
2210         return rc;
2211 }
2212
2213 /** enable/disable storing of hardlink info */
2214 int mdd_linkea_enable = 1;
2215 CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644,
2216                 "record hardlink info in EAs");
2217
2218 /** Read the link EA into a temp buffer.
2219  * Uses the name_buf since it is generally large.
2220  * \retval IS_ERR err
2221  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2222  */
2223 struct lu_buf *mdd_links_get(const struct lu_env *env,
2224                              struct mdd_object *mdd_obj)
2225 {
2226         struct lu_buf *buf;
2227         struct lustre_capa *capa;
2228         struct link_ea_header *leh;
2229         int rc;
2230
2231         /* First try a small buf */
2232         buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2233         if (buf->lb_buf == NULL)
2234                 return ERR_PTR(-ENOMEM);
2235
2236         capa = mdd_object_capa(env, mdd_obj);
2237         rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2238         if (rc == -ERANGE) {
2239                 /* Buf was too small, figure out what we need. */
2240                 buf->lb_buf = NULL;
2241                 buf->lb_len = 0;
2242                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2243                 if (rc < 0)
2244                         return ERR_PTR(rc);
2245                 buf = mdd_buf_alloc(env, rc);
2246                 if (buf->lb_buf == NULL)
2247                         return ERR_PTR(-ENOMEM);
2248                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2249         }
2250         if (rc < 0)
2251                 return ERR_PTR(rc);
2252
2253         leh = buf->lb_buf;
2254         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2255                 leh->leh_magic = LINK_EA_MAGIC;
2256                 leh->leh_reccount = __swab32(leh->leh_reccount);
2257                 leh->leh_len = __swab64(leh->leh_len);
2258                 /* entries are swabbed by mdd_lee_unpack */
2259         }
2260         if (leh->leh_magic != LINK_EA_MAGIC)
2261                 return ERR_PTR(-EINVAL);
2262         if (leh->leh_reccount == 0)
2263                 return ERR_PTR(-ENODATA);
2264
2265         return buf;
2266 }
2267
2268 /** Pack a link_ea_entry.
2269  * All elements are stored as chars to avoid alignment issues.
2270  * Numbers are always big-endian
2271  * \param packbuf is a temp fid buffer
2272  * \retval record length
2273  */
2274 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2275                          const struct lu_fid *pfid, struct lu_fid* packbuf)
2276 {
2277         char *ptr;
2278         int reclen;
2279
2280         fid_pack(&lee->lee_parent_fid, pfid, packbuf);
2281         ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len;
2282         strncpy(ptr, lname->ln_name, lname->ln_namelen);
2283         reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen +
2284                 sizeof(lee->lee_reclen);
2285         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2286         lee->lee_reclen[1] = reclen & 0xff;
2287         return reclen;
2288 }
2289
2290 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2291                     struct lu_name *lname, struct lu_fid *pfid)
2292 {
2293         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2294         fid_unpack(&lee->lee_parent_fid, pfid);
2295         lname->ln_name = (char *)&lee->lee_parent_fid +
2296                 lee->lee_parent_fid.fp_len;
2297         lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len -
2298                 sizeof(lee->lee_reclen);
2299 }
2300
2301 /** Add a record to the end of link ea buf */
2302 static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
2303                            const struct lu_fid *pfid,
2304                            const struct lu_name *lname)
2305 {
2306         struct link_ea_header *leh;
2307         struct link_ea_entry *lee;
2308         int reclen;
2309
2310         if (lname == NULL || pfid == NULL)
2311                 return -EINVAL;
2312
2313         /* Make sure our buf is big enough for the new one */
2314         leh = buf->lb_buf;
2315         reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2316         if (leh->leh_len + reclen > buf->lb_len) {
2317                 if (mdd_buf_grow(env, leh->leh_len + reclen) < 0)
2318                         return -ENOMEM;
2319         }
2320
2321         leh = buf->lb_buf;
2322         lee = buf->lb_buf + leh->leh_len;
2323         reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2);
2324         leh->leh_len += reclen;
2325         leh->leh_reccount++;
2326         return 0;
2327 }
2328
2329 /* For pathologic linkers, we don't want to spend lots of time scanning the
2330  * link ea.  Limit ourseleves to something reasonable; links not in the EA
2331  * can be looked up via (slower) parent lookup.
2332  */
2333 #define LINKEA_MAX_COUNT 128
2334
2335 static int mdd_links_add(const struct lu_env *env,
2336                          struct mdd_object *mdd_obj,
2337                          const struct lu_fid *pfid,
2338                          const struct lu_name *lname,
2339                          struct thandle *handle)
2340 {
2341         struct lu_buf *buf;
2342         struct link_ea_header *leh;
2343         int rc;
2344         ENTRY;
2345
2346         if (!mdd_linkea_enable)
2347                 RETURN(0);
2348
2349         buf = mdd_links_get(env, mdd_obj);
2350         if (IS_ERR(buf)) {
2351                 rc = PTR_ERR(buf);
2352                 if (rc != -ENODATA) {
2353                         CERROR("link_ea read failed %d "DFID"\n", rc,
2354                                PFID(mdd_object_fid(mdd_obj)));
2355                         RETURN (rc);
2356                 }
2357                 /* empty EA; start one */
2358                 buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2359                 if (buf->lb_buf == NULL)
2360                         RETURN(-ENOMEM);
2361                 leh = buf->lb_buf;
2362                 leh->leh_magic = LINK_EA_MAGIC;
2363                 leh->leh_len = sizeof(struct link_ea_header);
2364                 leh->leh_reccount = 0;
2365         }
2366
2367         leh = buf->lb_buf;
2368         if (leh->leh_reccount > LINKEA_MAX_COUNT)
2369                 RETURN(-EOVERFLOW);
2370
2371         rc = __mdd_links_add(env, buf, pfid, lname);
2372         if (rc)
2373                 RETURN(rc);
2374
2375         leh = buf->lb_buf;
2376         rc = __mdd_xattr_set(env, mdd_obj,
2377                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2378                              XATTR_NAME_LINK, 0, handle);
2379         if (rc)
2380                 CERROR("link_ea add failed %d "DFID"\n", rc,
2381                        PFID(mdd_object_fid(mdd_obj)));
2382
2383         if (buf->lb_vmalloc)
2384                 /* if we vmalloced a large buffer drop it */
2385                 mdd_buf_put(buf);
2386
2387         RETURN (rc);
2388 }
2389
2390 static int mdd_links_rename(const struct lu_env *env,
2391                             struct mdd_object *mdd_obj,
2392                             const struct lu_fid *oldpfid,
2393                             const struct lu_name *oldlname,
2394                             const struct lu_fid *newpfid,
2395                             const struct lu_name *newlname,
2396                             struct thandle *handle)
2397 {
2398         struct lu_buf  *buf;
2399         struct link_ea_header *leh;
2400         struct link_ea_entry  *lee;
2401         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
2402         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2403         int reclen = 0;
2404         int count;
2405         int rc, rc2 = 0;
2406         ENTRY;
2407
2408         if (!mdd_linkea_enable)
2409                 RETURN(0);
2410
2411         if (mdd_obj->mod_flags & DEAD_OBJ)
2412                 /* No more links, don't bother */
2413                 RETURN(0);
2414
2415         buf = mdd_links_get(env, mdd_obj);
2416         if (IS_ERR(buf)) {
2417                 rc = PTR_ERR(buf);
2418                 CERROR("link_ea read failed %d "DFID"\n",
2419                        rc, PFID(mdd_object_fid(mdd_obj)));
2420                 RETURN(rc);
2421         }
2422         leh = buf->lb_buf;
2423         lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
2424
2425         /* Find the old record */
2426         for(count = 0; count <= leh->leh_reccount; count++) {
2427                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
2428                 if (tmpname->ln_namelen == oldlname->ln_namelen &&
2429                     lu_fid_eq(tmpfid, oldpfid) &&
2430                     (strncmp(tmpname->ln_name, oldlname->ln_name,
2431                              tmpname->ln_namelen) == 0))
2432                         break;
2433                 lee = (struct link_ea_entry *)((char *)lee + reclen);
2434         }
2435         if (count > leh->leh_reccount) {
2436                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2437                        oldlname->ln_namelen, oldlname->ln_name);
2438                 GOTO(out, rc = -ENOENT);
2439         }
2440
2441         /* Remove the old record */
2442         leh->leh_reccount--;
2443         leh->leh_len -= reclen;
2444         memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len -
2445                 (char *)lee);
2446
2447         /* If renaming, add the new record */
2448         if (newpfid != NULL) {
2449                 /* if the add fails, we still delete the out-of-date old link */
2450                 rc2 = __mdd_links_add(env, buf, newpfid, newlname);
2451                 leh = buf->lb_buf;
2452         }
2453
2454         rc = __mdd_xattr_set(env, mdd_obj,
2455                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2456                              XATTR_NAME_LINK, 0, handle);
2457
2458 out:
2459         if (rc == 0)
2460                 rc = rc2;
2461         if (rc)
2462                 CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n",
2463                        oldlname->ln_namelen, oldlname->ln_name, rc,
2464                        PFID(mdd_object_fid(mdd_obj)));
2465
2466         if (buf->lb_vmalloc)
2467                 /* if we vmalloced a large buffer drop it */
2468                 mdd_buf_put(buf);
2469
2470         RETURN (rc);
2471 }
2472
2473 const struct md_dir_operations mdd_dir_ops = {
2474         .mdo_is_subdir     = mdd_is_subdir,
2475         .mdo_lookup        = mdd_lookup,
2476         .mdo_create        = mdd_create,
2477         .mdo_rename        = mdd_rename,
2478         .mdo_link          = mdd_link,
2479         .mdo_unlink        = mdd_unlink,
2480         .mdo_name_insert   = mdd_name_insert,
2481         .mdo_name_remove   = mdd_name_remove,
2482         .mdo_rename_tgt    = mdd_rename_tgt,
2483         .mdo_create_data   = mdd_create_data
2484 };