Whamcloud - gitweb
c6b57d88e203440c4bd82ef169537cff77b71d4f
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55
56 #include <linux/ldiskfs_fs.h>
57 #include <lustre_mds.h>
58 #include <lustre/lustre_idl.h>
59 #include <lustre_fid.h>
60
61 #include "mdd_internal.h"
62
63 static const char dot[] = ".";
64 static const char dotdot[] = "..";
65
66 static struct lu_name lname_dotdot = {
67         (char *) dotdot,
68         sizeof(dotdot) - 1
69 };
70
71 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
72                         const struct lu_name *lname, struct lu_fid* fid,
73                         int mask);
74 static int mdd_links_add(const struct lu_env *env,
75                          struct mdd_object *mdd_obj,
76                          const struct lu_fid *pfid,
77                          const struct lu_name *lname,
78                          struct thandle *handle);
79 static int mdd_links_rename(const struct lu_env *env,
80                             struct mdd_object *mdd_obj,
81                             const struct lu_fid *oldpfid,
82                             const struct lu_name *oldlname,
83                             const struct lu_fid *newpfid,
84                             const struct lu_name *newlname,
85                             struct thandle *handle);
86
87 static int
88 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
89                     const struct lu_name *lname, struct lu_fid* fid, int mask)
90 {
91         const char *name = lname->ln_name;
92         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
93         struct dynlock_handle *dlh;
94         int rc;
95
96         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
97         if (unlikely(dlh == NULL))
98                 return -ENOMEM;
99         rc = __mdd_lookup(env, pobj, lname, fid, mask);
100         mdd_pdo_read_unlock(env, mdd_obj, dlh);
101
102         return rc;
103 }
104
105 int mdd_lookup(const struct lu_env *env,
106                struct md_object *pobj, const struct lu_name *lname,
107                struct lu_fid* fid, struct md_op_spec *spec)
108 {
109         int rc;
110         ENTRY;
111         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
112         RETURN(rc);
113 }
114
115 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
116                           struct lu_fid *fid)
117 {
118         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
119 }
120
121 /*
122  * For root fid use special function, which does not compare version component
123  * of fid. Version component is different for root fids on all MDTs.
124  */
125 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
126 {
127         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
128                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
129 }
130
131 /*
132  * return 1: if lf is the fid of the ancestor of p1;
133  * return 0: if not;
134  *
135  * return -EREMOTE: if remote object is found, in this
136  * case fid of remote object is saved to @pf;
137  *
138  * otherwise: values < 0, errors.
139  */
140 static int mdd_is_parent(const struct lu_env *env,
141                          struct mdd_device *mdd,
142                          struct mdd_object *p1,
143                          const struct lu_fid *lf,
144                          struct lu_fid *pf)
145 {
146         struct mdd_object *parent = NULL;
147         struct lu_fid *pfid;
148         int rc;
149         ENTRY;
150
151         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
152         pfid = &mdd_env_info(env)->mti_fid;
153
154         /* Check for root first. */
155         if (mdd_is_root(mdd, mdo2fid(p1)))
156                 RETURN(0);
157
158         for(;;) {
159                 /* this is done recursively, bypass capa for each obj */
160                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
161                 rc = mdd_parent_fid(env, p1, pfid);
162                 if (rc)
163                         GOTO(out, rc);
164                 if (mdd_is_root(mdd, pfid))
165                         GOTO(out, rc = 0);
166                 if (lu_fid_eq(pfid, lf))
167                         GOTO(out, rc = 1);
168                 if (parent)
169                         mdd_object_put(env, parent);
170                 parent = mdd_object_find(env, mdd, pfid);
171
172                 /* cross-ref parent */
173                 if (parent == NULL) {
174                         if (pf != NULL)
175                                 *pf = *pfid;
176                         GOTO(out, rc = -EREMOTE);
177                 } else if (IS_ERR(parent))
178                         GOTO(out, rc = PTR_ERR(parent));
179                 p1 = parent;
180         }
181         EXIT;
182 out:
183         if (parent && !IS_ERR(parent))
184                 mdd_object_put(env, parent);
185         return rc;
186 }
187
188 /*
189  * No permission check is needed.
190  *
191  * returns 1: if fid is ancestor of @mo;
192  * returns 0: if fid is not a ancestor of @mo;
193  *
194  * returns EREMOTE if remote object is found, fid of remote object is saved to
195  * @fid;
196  *
197  * returns < 0: if error
198  */
199 static int mdd_is_subdir(const struct lu_env *env,
200                          struct md_object *mo, const struct lu_fid *fid,
201                          struct lu_fid *sfid)
202 {
203         struct mdd_device *mdd = mdo2mdd(mo);
204         int rc;
205         ENTRY;
206
207         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
208                 RETURN(0);
209
210         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
211         if (rc == 0) {
212                 /* found root */
213                 fid_zero(sfid);
214         } else if (rc == 1) {
215                 /* found @fid is parent */
216                 *sfid = *fid;
217                 rc = 0;
218         }
219         RETURN(rc);
220 }
221
222 /*
223  * Check that @dir contains no entries except (possibly) dot and dotdot.
224  *
225  * Returns:
226  *
227  *             0        empty
228  *      -ENOTDIR        not a directory object
229  *    -ENOTEMPTY        not empty
230  *           -ve        other error
231  *
232  */
233 static int mdd_dir_is_empty(const struct lu_env *env,
234                             struct mdd_object *dir)
235 {
236         struct dt_it     *it;
237         struct dt_object *obj;
238         const struct dt_it_ops *iops;
239         int result;
240         ENTRY;
241
242         obj = mdd_object_child(dir);
243         if (!dt_try_as_dir(env, obj))
244                 RETURN(-ENOTDIR);
245
246         iops = &obj->do_index_ops->dio_it;
247         it = iops->init(env, obj, BYPASS_CAPA);
248         if (it != NULL) {
249                 result = iops->get(env, it, (const void *)"");
250                 if (result > 0) {
251                         int i;
252                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
253                                 result = iops->next(env, it);
254                         if (result == 0)
255                                 result = -ENOTEMPTY;
256                         else if (result == +1)
257                                 result = 0;
258                 } else if (result == 0)
259                         /*
260                          * Huh? Index contains no zero key?
261                          */
262                         result = -EIO;
263
264                 iops->put(env, it);
265                 iops->fini(env, it);
266         } else
267                 result = -ENOMEM;
268         RETURN(result);
269 }
270
271 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
272 {
273         struct mdd_device *m = mdd_obj2mdd_dev(obj);
274         struct lu_attr *la = &mdd_env_info(env)->mti_la;
275         int rc;
276         ENTRY;
277
278         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
279         if (rc)
280                 RETURN(rc);
281
282         /*
283          * Subdir count limitation can be broken through.
284          */
285         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
286             !S_ISDIR(la->la_mode))
287                 RETURN(-EMLINK);
288         else
289                 RETURN(0);
290 }
291
292 /*
293  * Check whether it may create the cobj under the pobj.
294  * cobj maybe NULL
295  */
296 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
297                    struct mdd_object *cobj, int check_perm, int check_nlink)
298 {
299         int rc = 0;
300         ENTRY;
301
302         if (cobj && mdd_object_exists(cobj))
303                 RETURN(-EEXIST);
304
305         if (mdd_is_dead_obj(pobj))
306                 RETURN(-ENOENT);
307
308         if (check_perm)
309                 rc = mdd_permission_internal_locked(env, pobj, NULL,
310                                                     MAY_WRITE | MAY_EXEC,
311                                                     MOR_TGT_PARENT);
312
313         if (!rc && check_nlink)
314                 rc = __mdd_may_link(env, pobj);
315
316         RETURN(rc);
317 }
318
319 /*
320  * Check whether can unlink from the pobj in the case of "cobj == NULL".
321  */
322 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
323                    const struct md_attr *ma)
324 {
325         int rc;
326         ENTRY;
327
328         if (mdd_is_dead_obj(pobj))
329                 RETURN(-ENOENT);
330
331         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
332             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
333                 RETURN(-EPERM);
334
335         rc = mdd_permission_internal_locked(env, pobj, NULL,
336                                             MAY_WRITE | MAY_EXEC,
337                                             MOR_TGT_PARENT);
338         if (rc)
339                 RETURN(rc);
340
341         if (mdd_is_append(pobj))
342                 RETURN(-EPERM);
343
344         RETURN(rc);
345 }
346
347 /*
348  * pobj == NULL is remote ops case, under such case, pobj's
349  * VTX feature has been checked already, no need check again.
350  */
351 static inline int mdd_is_sticky(const struct lu_env *env,
352                                 struct mdd_object *pobj,
353                                 struct mdd_object *cobj)
354 {
355         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
356         struct md_ucred *uc = md_ucred(env);
357         int rc;
358
359         if (pobj) {
360                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
361                 if (rc)
362                         return rc;
363
364                 if (!(tmp_la->la_mode & S_ISVTX) ||
365                      (tmp_la->la_uid == uc->mu_fsuid))
366                         return 0;
367         }
368
369         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
370         if (rc)
371                 return rc;
372
373         if (tmp_la->la_uid == uc->mu_fsuid)
374                 return 0;
375
376         return !mdd_capable(uc, CFS_CAP_FOWNER);
377 }
378
379 /*
380  * Check whether it may delete the cobj from the pobj.
381  * pobj maybe NULL
382  */
383 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
384                    struct mdd_object *cobj, struct md_attr *ma,
385                    int check_perm, int check_empty)
386 {
387         int rc = 0;
388         ENTRY;
389
390         LASSERT(cobj);
391         if (!mdd_object_exists(cobj))
392                 RETURN(-ENOENT);
393
394         if (pobj) {
395                 if (mdd_is_dead_obj(pobj))
396                         RETURN(-ENOENT);
397
398                 if (check_perm) {
399                         rc = mdd_permission_internal_locked(env, pobj, NULL,
400                                                     MAY_WRITE | MAY_EXEC,
401                                                     MOR_TGT_PARENT);
402                         if (rc)
403                                 RETURN(rc);
404                 }
405
406                 if (mdd_is_append(pobj))
407                         RETURN(-EPERM);
408         }
409
410         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
411             mdd_is_sticky(env, pobj, cobj))
412                 RETURN(-EPERM);
413
414         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
415                 RETURN(-EPERM);
416
417         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
418             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
419                 RETURN(-EPERM);
420
421         if (S_ISDIR(ma->ma_attr.la_mode)) {
422                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
423
424                 if (!S_ISDIR(mdd_object_type(cobj)))
425                         RETURN(-ENOTDIR);
426
427                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
428                         RETURN(-EBUSY);
429         } else if (S_ISDIR(mdd_object_type(cobj)))
430                 RETURN(-EISDIR);
431
432         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
433                 rc = mdd_dir_is_empty(env, cobj);
434
435         RETURN(rc);
436 }
437
438 /*
439  * tgt maybe NULL
440  * has mdd_write_lock on src already, but not on tgt yet
441  */
442 int mdd_link_sanity_check(const struct lu_env *env,
443                           struct mdd_object *tgt_obj,
444                           const struct lu_name *lname,
445                           struct mdd_object *src_obj)
446 {
447         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
448         int rc = 0;
449         ENTRY;
450
451         /* Local ops, no lookup before link, check filename length here. */
452         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
453                 RETURN(-ENAMETOOLONG);
454
455         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
456                 RETURN(-EPERM);
457
458         if (S_ISDIR(mdd_object_type(src_obj)))
459                 RETURN(-EPERM);
460
461         LASSERT(src_obj != tgt_obj);
462         if (tgt_obj) {
463                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
464                 if (rc)
465                         RETURN(rc);
466         }
467
468         rc = __mdd_may_link(env, src_obj);
469
470         RETURN(rc);
471 }
472
473 /**
474  * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
475  * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
476  * (maybe too large to be represented). It is a trick to break through the
477  * "i_nlink" limitation for subdir count.
478  */
479 void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
480                    struct thandle *handle)
481 {
482         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
483         struct mdd_device *m = mdd_obj2mdd_dev(obj);
484
485         if (!mdd_is_mnlink(obj)) {
486                 if (S_ISDIR(mdd_object_type(obj))) {
487                         if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
488                                 return;
489
490                         if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
491                                 obj->mod_flags |= MNLINK_OBJ;
492                                 tmp_la->la_nlink = 1;
493                                 tmp_la->la_valid = LA_NLINK;
494                                 mdd_attr_set_internal(env, obj, tmp_la, handle,
495                                                       0);
496                                 return;
497                         }
498                 }
499                 mdo_ref_add(env, obj, handle);
500         }
501 }
502
503 void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
504                    struct thandle *handle, int is_dot)
505 {
506         if (!mdd_is_mnlink(obj) || is_dot)
507                 mdo_ref_del(env, obj, handle);
508 }
509
510 /* insert named index, add reference if isdir */
511 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
512                               const struct lu_fid *lf, const char *name, int is_dir,
513                               struct thandle *handle, struct lustre_capa *capa)
514 {
515         struct dt_object *next = mdd_object_child(pobj);
516         int               rc;
517         ENTRY;
518
519         if (dt_try_as_dir(env, next)) {
520                 struct md_ucred  *uc = md_ucred(env);
521
522                 rc = next->do_index_ops->dio_insert(env, next,
523                                                     __mdd_fid_rec(env, lf),
524                                                     (const struct dt_key *)name,
525                                                     handle, capa, uc->mu_cap &
526                                                     CFS_CAP_SYS_RESOURCE_MASK);
527         } else {
528                 rc = -ENOTDIR;
529         }
530
531         if (rc == 0) {
532                 if (is_dir) {
533                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
534                         __mdd_ref_add(env, pobj, handle);
535                         mdd_write_unlock(env, pobj);
536                 }
537         }
538         RETURN(rc);
539 }
540
541 /* delete named index, drop reference if isdir */
542 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
543                               const char *name, int is_dir, struct thandle *handle,
544                               struct lustre_capa *capa)
545 {
546         struct dt_object *next = mdd_object_child(pobj);
547         int               rc;
548         ENTRY;
549
550         if (dt_try_as_dir(env, next)) {
551                 rc = next->do_index_ops->dio_delete(env, next,
552                                                     (struct dt_key *)name,
553                                                     handle, capa);
554                 if (rc == 0 && is_dir) {
555                         int is_dot = 0;
556
557                         if (name != NULL && name[0] == '.' && name[1] == 0)
558                                 is_dot = 1;
559                         mdd_write_lock(env, pobj, MOR_TGT_PARENT);
560                         __mdd_ref_del(env, pobj, handle, is_dot);
561                         mdd_write_unlock(env, pobj);
562                 }
563         } else
564                 rc = -ENOTDIR;
565
566         RETURN(rc);
567 }
568
569 static int
570 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
571                         const struct lu_fid *lf, const char *name,
572                         struct thandle *handle, struct lustre_capa *capa)
573 {
574         struct dt_object *next = mdd_object_child(pobj);
575         int               rc;
576         ENTRY;
577
578         if (dt_try_as_dir(env, next)) {
579                 struct md_ucred  *uc = md_ucred(env);
580
581                 rc = next->do_index_ops->dio_insert(env, next,
582                                                     __mdd_fid_rec(env, lf),
583                                                     (const struct dt_key *)name,
584                                                     handle, capa, uc->mu_cap &
585                                                     CFS_CAP_SYS_RESOURCE_MASK);
586         } else {
587                 rc = -ENOTDIR;
588         }
589         RETURN(rc);
590 }
591
592 /** Store a namespace change changelog record
593  * If this fails, we must fail the whole transaction; we don't
594  * want the change to commit without the log entry.
595  * \param target - mdd_object of change
596  * \param parent - parent dir/object
597  * \param tf - target lu_fid, overrides fid of \a target if this is non-null
598  * \param tname - target name string
599  * \param handle - transacion handle
600  */
601 static int mdd_changelog_ns_store(const struct lu_env  *env,
602                                   struct mdd_device    *mdd,
603                                   enum changelog_rec_type type,
604                                   struct mdd_object    *target,
605                                   struct mdd_object    *parent,
606                                   const struct lu_fid  *tf,
607                                   const struct lu_name *tname,
608                                   struct thandle *handle)
609 {
610         const struct lu_fid *tfid;
611         const struct lu_fid *tpfid = mdo2fid(parent);
612         struct llog_changelog_rec *rec;
613         struct lu_buf *buf;
614         int reclen;
615         int rc;
616         ENTRY;
617
618         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
619                 RETURN(0);
620
621         LASSERT(parent != NULL);
622         LASSERT(tname != NULL);
623         LASSERT(handle != NULL);
624
625         /* target */
626         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
627         buf = mdd_buf_alloc(env, reclen);
628         if (buf->lb_buf == NULL)
629                 RETURN(-ENOMEM);
630         rec = (struct llog_changelog_rec *)buf->lb_buf;
631
632         rec->cr_flags = CLF_VERSION;
633         rec->cr_type = (__u32)type;
634         tfid = tf ? tf : mdo2fid(target);
635         rec->cr_tfid = *tfid;
636         rec->cr_pfid = *tpfid;
637         rec->cr_namelen = tname->ln_namelen;
638         memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen);
639         if (likely(target))
640                 target->mod_cltime = cfs_time_current_64();
641
642         rc = mdd_changelog_llog_write(mdd, rec, handle);
643         if (rc < 0) {
644                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
645                        rc, type, tname->ln_name, PFID(tfid), PFID(tpfid));
646                 return -EFAULT;
647         }
648
649         return 0;
650 }
651
652 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
653                     struct md_object *src_obj, const struct lu_name *lname,
654                     struct md_attr *ma)
655 {
656         const char *name = lname->ln_name;
657         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
658         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
659         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
660         struct mdd_device *mdd = mdo2mdd(src_obj);
661         struct dynlock_handle *dlh;
662         struct thandle *handle;
663 #ifdef HAVE_QUOTA_SUPPORT
664         struct obd_device *obd = mdd->mdd_obd_dev;
665         struct mds_obd *mds = &obd->u.mds;
666         unsigned int qids[MAXQUOTAS] = { 0, 0 };
667         int quota_opc = 0, rec_pending = 0;
668 #endif
669         int rc;
670         ENTRY;
671
672 #ifdef HAVE_QUOTA_SUPPORT
673         if (mds->mds_quota) {
674                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
675
676                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
677                 if (!rc) {
678                         quota_opc = FSFILT_OP_LINK;
679                         mdd_quota_wrapper(la_tmp, qids);
680                         /* get block quota for parent */
681                         lquota_chkquota(mds_quota_interface_ref, obd,
682                                         qids[USRQUOTA], qids[GRPQUOTA], 1,
683                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK,
684                                         NULL, 0);
685                 }
686         }
687 #endif
688
689         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
690         handle = mdd_trans_start(env, mdd);
691         if (IS_ERR(handle))
692                 GOTO(out_pending, rc = PTR_ERR(handle));
693
694         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
695         if (dlh == NULL)
696                 GOTO(out_trans, rc = -ENOMEM);
697         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
698
699         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
700         if (rc)
701                 GOTO(out_unlock, rc);
702
703         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
704                                      name, handle,
705                                      mdd_object_capa(env, mdd_tobj));
706         if (rc)
707                 GOTO(out_unlock, rc);
708
709         __mdd_ref_add(env, mdd_sobj, handle);
710
711         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
712         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
713
714         la->la_valid = LA_CTIME | LA_MTIME;
715         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
716         if (rc)
717                 GOTO(out_unlock, rc);
718
719         la->la_valid = LA_CTIME;
720         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
721         if (rc == 0)
722                 mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle);
723
724         EXIT;
725 out_unlock:
726         mdd_write_unlock(env, mdd_sobj);
727         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
728 out_trans:
729         if (rc == 0)
730                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj,
731                                             mdd_tobj, NULL, lname, handle);
732         mdd_trans_stop(env, mdd, rc, handle);
733 out_pending:
734 #ifdef HAVE_QUOTA_SUPPORT
735         if (quota_opc) {
736                 if (rec_pending)
737                         lquota_pending_commit(mds_quota_interface_ref, obd,
738                                               qids[USRQUOTA], qids[GRPQUOTA],
739                                               rec_pending, 1);
740                 /* Trigger dqacq for the parent owner. If failed,
741                  * the next call for lquota_chkquota will process it. */
742                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
743                               quota_opc);
744         }
745 #endif
746         return rc;
747 }
748
749 /* caller should take a lock before calling */
750 int mdd_finish_unlink(const struct lu_env *env,
751                       struct mdd_object *obj, struct md_attr *ma,
752                       struct thandle *th)
753 {
754         int rc;
755         int reset = 1;
756         ENTRY;
757
758         rc = mdd_iattr_get(env, obj, ma);
759         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
760                 /* add new orphan and the object
761                  * will be deleted during mdd_close() */
762                 if (obj->mod_count) {
763                         rc = __mdd_orphan_add(env, obj, th);
764                         if (rc == 0)
765                                 obj->mod_flags |= ORPHAN_OBJ;
766                 }
767
768                 obj->mod_flags |= DEAD_OBJ;
769                 if (!(obj->mod_flags & ORPHAN_OBJ)) {
770                         rc = mdd_object_kill(env, obj, ma);
771                         if (rc == 0)
772                                 reset = 0;
773                 }
774
775         }
776         if (reset)
777                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
778
779         RETURN(rc);
780 }
781
782 /*
783  * pobj maybe NULL
784  * has mdd_write_lock on cobj already, but not on pobj yet
785  */
786 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
787                             struct mdd_object *cobj, struct md_attr *ma)
788 {
789         int rc;
790         ENTRY;
791
792         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
793
794         RETURN(rc);
795 }
796
797 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
798                       struct md_object *cobj, const struct lu_name *lname,
799                       struct md_attr *ma)
800 {
801         const char *name = lname->ln_name;
802         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
803         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
804         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
805         struct mdd_device *mdd = mdo2mdd(pobj);
806         struct dynlock_handle *dlh;
807         struct thandle    *handle;
808 #ifdef HAVE_QUOTA_SUPPORT
809         struct obd_device *obd = mdd->mdd_obd_dev;
810         struct mds_obd *mds = &obd->u.mds;
811         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
812         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
813         int quota_opc = 0;
814 #endif
815         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
816         int rc;
817         ENTRY;
818
819         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
820                  PFID(mdd_object_fid(mdd_cobj)));
821
822         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
823         if (rc)
824                 RETURN(rc);
825
826         handle = mdd_trans_start(env, mdd);
827         if (IS_ERR(handle))
828                 RETURN(PTR_ERR(handle));
829
830         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
831         if (dlh == NULL)
832                 GOTO(out_trans, rc = -ENOMEM);
833         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
834
835         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
836         if (rc)
837                 GOTO(cleanup, rc);
838
839         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
840                                 mdd_object_capa(env, mdd_pobj));
841         if (rc)
842                 GOTO(cleanup, rc);
843
844         __mdd_ref_del(env, mdd_cobj, handle, 0);
845         if (is_dir)
846                 /* unlink dot */
847                 __mdd_ref_del(env, mdd_cobj, handle, 1);
848
849         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
850         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
851
852         la->la_valid = LA_CTIME | LA_MTIME;
853         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
854         if (rc)
855                 GOTO(cleanup, rc);
856
857         la->la_valid = LA_CTIME;
858         rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
859         if (rc)
860                 GOTO(cleanup, rc);
861
862         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
863 #ifdef HAVE_QUOTA_SUPPORT
864         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
865             ma->ma_attr.la_nlink == 0) {
866                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
867
868                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
869                 if (!rc) {
870                         mdd_quota_wrapper(la_tmp, qpids);
871                         if (mdd_cobj->mod_count == 0) {
872                                 quota_opc = FSFILT_OP_UNLINK;
873                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
874                         } else {
875                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
876                         }
877                 }
878         }
879 #endif
880
881         if (rc == 0)
882                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
883                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
884                                    NULL, NULL);
885         if (!is_dir)
886                 /* old files may not have link ea; ignore errors */
887                 mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj),
888                                  lname, NULL, NULL, handle);
889
890         EXIT;
891 cleanup:
892         mdd_write_unlock(env, mdd_cobj);
893         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
894 out_trans:
895         if (rc == 0)
896                 rc = mdd_changelog_ns_store(env, mdd,
897                                             is_dir ? CL_RMDIR : CL_UNLINK,
898                                             mdd_cobj, mdd_pobj, NULL, lname,
899                                             handle);
900
901         mdd_trans_stop(env, mdd, rc, handle);
902 #ifdef HAVE_QUOTA_SUPPORT
903         if (quota_opc)
904                 /* Trigger dqrel on the owner of child and parent. If failed,
905                  * the next call for lquota_chkquota will process it. */
906                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
907                               quota_opc);
908 #endif
909         return rc;
910 }
911
912 /* has not lock on pobj yet */
913 static int mdd_ni_sanity_check(const struct lu_env *env,
914                                struct md_object *pobj,
915                                const struct md_attr *ma)
916 {
917         struct mdd_object *obj = md2mdd_obj(pobj);
918         int rc;
919         ENTRY;
920
921         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
922                 RETURN(0);
923
924         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
925
926         RETURN(rc);
927 }
928
929 /*
930  * Partial operation.
931  */
932 static int mdd_name_insert(const struct lu_env *env,
933                            struct md_object *pobj,
934                            const struct lu_name *lname,
935                            const struct lu_fid *fid,
936                            const struct md_attr *ma)
937 {
938         const char *name = lname->ln_name;
939         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
940         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
941         struct mdd_device *mdd = mdo2mdd(pobj);
942         struct dynlock_handle *dlh;
943         struct thandle *handle;
944         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
945 #ifdef HAVE_QUOTA_SUPPORT
946         struct md_ucred *uc = md_ucred(env);
947         struct obd_device *obd = mdd->mdd_obd_dev;
948         struct mds_obd *mds = &obd->u.mds;
949         unsigned int qids[MAXQUOTAS] = { 0, 0 };
950         int quota_opc = 0, rec_pending = 0;
951         cfs_cap_t save = uc->mu_cap;
952 #endif
953         int rc;
954         ENTRY;
955
956 #ifdef HAVE_QUOTA_SUPPORT
957         if (mds->mds_quota) {
958                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
959                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
960
961                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
962                         if (!rc) {
963                                 quota_opc = FSFILT_OP_LINK;
964                                 mdd_quota_wrapper(la_tmp, qids);
965                                 /* get block quota for parent */
966                                 lquota_chkquota(mds_quota_interface_ref, obd,
967                                                 qids[USRQUOTA], qids[GRPQUOTA],
968                                                 1, &rec_pending, NULL,
969                                                 LQUOTA_FLAGS_BLK, NULL, 0);
970                         }
971                 } else {
972                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
973                 }
974         }
975 #endif
976         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
977         handle = mdd_trans_start(env, mdo2mdd(pobj));
978         if (IS_ERR(handle))
979                 GOTO(out_pending, rc = PTR_ERR(handle));
980
981         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
982         if (dlh == NULL)
983                 GOTO(out_trans, rc = -ENOMEM);
984
985         rc = mdd_ni_sanity_check(env, pobj, ma);
986         if (rc)
987                 GOTO(out_unlock, rc);
988
989         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
990                                 handle, BYPASS_CAPA);
991         if (rc)
992                 GOTO(out_unlock, rc);
993
994         /*
995          * For some case, no need update obj's ctime (LA_CTIME is not set),
996          * e.g. split_dir.
997          * For other cases, update obj's ctime (LA_CTIME is set),
998          * e.g. cmr_link.
999          */
1000         if (ma->ma_attr.la_valid & LA_CTIME) {
1001                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1002                 la->la_valid = LA_CTIME | LA_MTIME;
1003                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1004                                                         handle, 0);
1005         }
1006         EXIT;
1007 out_unlock:
1008         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1009 out_trans:
1010         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
1011 out_pending:
1012 #ifdef HAVE_QUOTA_SUPPORT
1013         if (mds->mds_quota) {
1014                 if (quota_opc) {
1015                         if (rec_pending)
1016                                 lquota_pending_commit(mds_quota_interface_ref,
1017                                                       obd, qids[USRQUOTA],
1018                                                       qids[GRPQUOTA],
1019                                                       rec_pending, 1);
1020                         /* Trigger dqacq for the parent owner. If failed,
1021                          * the next call for lquota_chkquota will process it*/
1022                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
1023                                       rc, quota_opc);
1024                 } else {
1025                         uc->mu_cap = save;
1026                 }
1027         }
1028 #endif
1029         return rc;
1030 }
1031
1032 /* has not lock on pobj yet */
1033 static int mdd_nr_sanity_check(const struct lu_env *env,
1034                                struct md_object *pobj,
1035                                const struct md_attr *ma)
1036 {
1037         struct mdd_object *obj = md2mdd_obj(pobj);
1038         int rc;
1039         ENTRY;
1040
1041         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
1042                 RETURN(0);
1043
1044         rc = mdd_may_unlink(env, obj, ma);
1045
1046         RETURN(rc);
1047 }
1048
1049 /*
1050  * Partial operation.
1051  */
1052 static int mdd_name_remove(const struct lu_env *env,
1053                            struct md_object *pobj,
1054                            const struct lu_name *lname,
1055                            const struct md_attr *ma)
1056 {
1057         const char *name = lname->ln_name;
1058         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1059         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1060         struct mdd_device *mdd = mdo2mdd(pobj);
1061         struct dynlock_handle *dlh;
1062         struct thandle *handle;
1063         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1064 #ifdef HAVE_QUOTA_SUPPORT
1065         struct obd_device *obd = mdd->mdd_obd_dev;
1066         struct mds_obd *mds = &obd->u.mds;
1067         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1068         int quota_opc = 0;
1069 #endif
1070         int rc;
1071         ENTRY;
1072
1073 #ifdef HAVE_QUOTA_SUPPORT
1074         if (mds->mds_quota) {
1075                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1076
1077                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1078                 if (!rc) {
1079                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
1080                         mdd_quota_wrapper(la_tmp, qids);
1081                 }
1082         }
1083 #endif
1084         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
1085         handle = mdd_trans_start(env, mdd);
1086         if (IS_ERR(handle))
1087                 GOTO(out_pending, rc = PTR_ERR(handle));
1088
1089         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1090         if (dlh == NULL)
1091                 GOTO(out_trans, rc = -ENOMEM);
1092
1093         rc = mdd_nr_sanity_check(env, pobj, ma);
1094         if (rc)
1095                 GOTO(out_unlock, rc);
1096
1097         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1098                                 handle, BYPASS_CAPA);
1099         if (rc)
1100                 GOTO(out_unlock, rc);
1101
1102         /*
1103          * For some case, no need update obj's ctime (LA_CTIME is not set),
1104          * e.g. split_dir.
1105          * For other cases, update obj's ctime (LA_CTIME is set),
1106          * e.g. cmr_unlink.
1107          */
1108         if (ma->ma_attr.la_valid & LA_CTIME) {
1109                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1110                 la->la_valid = LA_CTIME | LA_MTIME;
1111                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1112                                                         handle, 0);
1113         }
1114         EXIT;
1115 out_unlock:
1116         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1117 out_trans:
1118         mdd_trans_stop(env, mdd, rc, handle);
1119 out_pending:
1120 #ifdef HAVE_QUOTA_SUPPORT
1121         /* Trigger dqrel for the parent owner.
1122          * If failed, the next call for lquota_chkquota will process it. */
1123         if (quota_opc)
1124                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1125                               quota_opc);
1126 #endif
1127         return rc;
1128 }
1129
1130 /*
1131  * tobj maybe NULL
1132  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1133  */
1134 static int mdd_rt_sanity_check(const struct lu_env *env,
1135                                struct mdd_object *tgt_pobj,
1136                                struct mdd_object *tobj,
1137                                struct md_attr *ma)
1138 {
1139         int rc;
1140         ENTRY;
1141
1142         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1143                 RETURN(0);
1144
1145         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1146          * exist. In fact, tobj must exist, otherwise the call trace will be:
1147          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1148          * When get here, tobj must be NOT NULL, the other case has been
1149          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1150          * MDS_PERM_BYPASS.
1151          * So check may_delete, but not check nlink of tgt_pobj. */
1152         LASSERT(tobj);
1153         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1154
1155         RETURN(rc);
1156 }
1157
1158 /* Partial rename op on slave MDD */
1159 static int mdd_rename_tgt(const struct lu_env *env,
1160                           struct md_object *pobj, struct md_object *tobj,
1161                           const struct lu_fid *lf, const struct lu_name *lname,
1162                           struct md_attr *ma)
1163 {
1164         const char *name = lname->ln_name;
1165         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1166         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1167         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1168         struct mdd_device *mdd = mdo2mdd(pobj);
1169         struct dynlock_handle *dlh;
1170         struct thandle *handle;
1171 #ifdef HAVE_QUOTA_SUPPORT
1172         struct obd_device *obd = mdd->mdd_obd_dev;
1173         struct mds_obd *mds = &obd->u.mds;
1174         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1175         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1176         int quota_opc = 0, rec_pending = 0;
1177 #endif
1178         int rc;
1179         ENTRY;
1180
1181 #ifdef HAVE_QUOTA_SUPPORT
1182         if (mds->mds_quota && !tobj) {
1183                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1184
1185                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1186                 if (!rc) {
1187                         quota_opc = FSFILT_OP_LINK;
1188                         mdd_quota_wrapper(la_tmp, qpids);
1189                         /* get block quota for target parent */
1190                         lquota_chkquota(mds_quota_interface_ref, obd,
1191                                         qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1192                                         &rec_pending, NULL, LQUOTA_FLAGS_BLK,
1193                                         NULL, 0);
1194                 }
1195         }
1196 #endif
1197         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
1198         handle = mdd_trans_start(env, mdd);
1199         if (IS_ERR(handle))
1200                 GOTO(out_pending, rc = PTR_ERR(handle));
1201
1202         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1203         if (dlh == NULL)
1204                 GOTO(out_trans, rc = -ENOMEM);
1205         if (tobj)
1206                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1207
1208         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1209         if (rc)
1210                 GOTO(cleanup, rc);
1211
1212         /*
1213          * If rename_tgt is called then we should just re-insert name with
1214          * correct fid, no need to dec/inc parent nlink if obj is dir.
1215          */
1216         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1217         if (rc)
1218                 GOTO(cleanup, rc);
1219
1220         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1221                                      BYPASS_CAPA);
1222         if (rc)
1223                 GOTO(cleanup, rc);
1224
1225         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1226         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1227
1228         la->la_valid = LA_CTIME | LA_MTIME;
1229         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1230         if (rc)
1231                 GOTO(cleanup, rc);
1232
1233         /*
1234          * For tobj is remote case cmm layer has processed
1235          * and pass NULL tobj to here. So when tobj is NOT NULL,
1236          * it must be local one.
1237          */
1238         if (tobj && mdd_object_exists(mdd_tobj)) {
1239                 __mdd_ref_del(env, mdd_tobj, handle, 0);
1240
1241                 /* Remove dot reference. */
1242                 if (S_ISDIR(ma->ma_attr.la_mode))
1243                         __mdd_ref_del(env, mdd_tobj, handle, 1);
1244
1245                 la->la_valid = LA_CTIME;
1246                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1247                 if (rc)
1248                         GOTO(cleanup, rc);
1249
1250                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1251                 if (rc)
1252                         GOTO(cleanup, rc);
1253
1254 #ifdef HAVE_QUOTA_SUPPORT
1255                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1256                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1257                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1258                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1259                 }
1260 #endif
1261         }
1262         EXIT;
1263 cleanup:
1264         if (tobj)
1265                 mdd_write_unlock(env, mdd_tobj);
1266         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1267 out_trans:
1268         if (rc == 0)
1269                 /* Bare EXT record with no RENAME in front of it signifies
1270                    a partial slave op */
1271                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
1272                                             mdd_tpobj, NULL, lname, handle);
1273
1274         mdd_trans_stop(env, mdd, rc, handle);
1275 out_pending:
1276 #ifdef HAVE_QUOTA_SUPPORT
1277         if (mds->mds_quota) {
1278                 if (rec_pending)
1279                         lquota_pending_commit(mds_quota_interface_ref, obd,
1280                                               qpids[USRQUOTA],
1281                                               qpids[GRPQUOTA],
1282                                               rec_pending, 1);
1283                 if (quota_opc)
1284                         /* Trigger dqrel/dqacq on the target owner of child and
1285                          * parent. If failed, the next call for lquota_chkquota
1286                          * will process it. */
1287                         lquota_adjust(mds_quota_interface_ref, obd, qcids,
1288                                       qpids, rc, quota_opc);
1289         }
1290 #endif
1291         return rc;
1292 }
1293
1294 /*
1295  * The permission has been checked when obj created, no need check again.
1296  */
1297 static int mdd_cd_sanity_check(const struct lu_env *env,
1298                                struct mdd_object *obj)
1299 {
1300         ENTRY;
1301
1302         /* EEXIST check */
1303         if (!obj || mdd_is_dead_obj(obj))
1304                 RETURN(-ENOENT);
1305
1306         RETURN(0);
1307
1308 }
1309
1310 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1311                            struct md_object *cobj, const struct md_op_spec *spec,
1312                            struct md_attr *ma)
1313 {
1314         struct mdd_device *mdd = mdo2mdd(cobj);
1315         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1316         struct mdd_object *son = md2mdd_obj(cobj);
1317         struct lu_attr    *attr = &ma->ma_attr;
1318         struct lov_mds_md *lmm = NULL;
1319         int                lmm_size = 0;
1320         struct thandle    *handle;
1321         int                rc;
1322         ENTRY;
1323
1324         rc = mdd_cd_sanity_check(env, son);
1325         if (rc)
1326                 RETURN(rc);
1327
1328         if (!md_should_create(spec->sp_cr_flags))
1329                 RETURN(0);
1330
1331         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1332                             spec, attr);
1333         if (rc)
1334                 RETURN(rc);
1335
1336         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
1337         handle = mdd_trans_start(env, mdd);
1338         if (IS_ERR(handle))
1339                 GOTO(out_free, rc = PTR_ERR(handle));
1340
1341         /*
1342          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1343          * Should this be fixed?
1344          */
1345
1346         /* Replay creates has objects already */
1347 #if 0
1348         if (spec->no_create) {
1349                 CDEBUG(D_INFO, "we already have lov ea\n");
1350                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1351                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1352                                     spec->u.sp_ea.eadatalen, handle, 0);
1353         } else
1354 #endif
1355                 /* No need mdd_lsm_sanity_check here */
1356                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1357                                     lmm_size, handle, 0);
1358
1359         if (rc == 0)
1360                rc = mdd_attr_get_internal_locked(env, son, ma);
1361
1362         /* update lov_objid data, must be before transaction stop! */
1363         if (rc == 0)
1364                 mdd_lov_objid_update(mdd, lmm);
1365
1366         mdd_trans_stop(env, mdd, rc, handle);
1367 out_free:
1368         /* Finish mdd_lov_create() stuff. */
1369         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1370         RETURN(rc);
1371 }
1372
1373 /* Get fid from name and parent */
1374 static int
1375 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1376              const struct lu_name *lname, struct lu_fid* fid, int mask)
1377 {
1378         const char          *name = lname->ln_name;
1379         const struct dt_key *key = (const struct dt_key *)name;
1380         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1381         struct mdd_device   *m = mdo2mdd(pobj);
1382         struct dt_object    *dir = mdd_object_child(mdd_obj);
1383         struct lu_fid_pack  *pack = &mdd_env_info(env)->mti_pack;
1384         int rc;
1385         ENTRY;
1386
1387         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1388                 RETURN(-ESTALE);
1389
1390         rc = mdd_object_exists(mdd_obj);
1391         if (unlikely(rc == 0))
1392                 RETURN(-ESTALE);
1393         else if (unlikely(rc < 0)) {
1394                 CERROR("Object "DFID" locates on remote server\n",
1395                         PFID(mdo2fid(mdd_obj)));
1396                 LBUG();
1397         }
1398
1399         /* The common filename length check. */
1400         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1401                 RETURN(-ENAMETOOLONG);
1402
1403         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1404                                             MOR_TGT_PARENT);
1405         if (rc)
1406                 RETURN(rc);
1407
1408         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1409                    dt_try_as_dir(env, dir))) {
1410
1411                 rc = dir->do_index_ops->dio_lookup(env, dir,
1412                                                  (struct dt_rec *)pack, key,
1413                                                  mdd_object_capa(env, mdd_obj));
1414                 if (rc > 0)
1415                         rc = fid_unpack(pack, fid);
1416                 else if (rc == 0)
1417                         rc = -ENOENT;
1418         } else
1419                 rc = -ENOTDIR;
1420
1421         RETURN(rc);
1422 }
1423
1424 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1425                           const struct lu_name *lname, struct mdd_object *child,
1426                           struct md_attr *ma, struct thandle *handle,
1427                           const struct md_op_spec *spec)
1428 {
1429         int rc;
1430         ENTRY;
1431
1432         /*
1433          * Update attributes for child.
1434          *
1435          * FIXME:
1436          *  (1) the valid bits should be converted between Lustre and Linux;
1437          *  (2) maybe, the child attributes should be set in OSD when creation.
1438          */
1439
1440         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1441         if (rc != 0)
1442                 RETURN(rc);
1443
1444         if (S_ISDIR(ma->ma_attr.la_mode)) {
1445                 /* Add "." and ".." for newly created dir */
1446                 __mdd_ref_add(env, child, handle);
1447                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1448                                              dot, handle, BYPASS_CAPA);
1449                 if (rc == 0) {
1450                         rc = __mdd_index_insert_only(env, child, pfid,
1451                                                      dotdot, handle,
1452                                                      BYPASS_CAPA);
1453                         if (rc != 0) {
1454                                 int rc2;
1455
1456                                 rc2 = __mdd_index_delete(env, child, dot, 1,
1457                                                          handle, BYPASS_CAPA);
1458                                 if (rc2 != 0)
1459                                         CERROR("Failure to cleanup after dotdot"
1460                                                " creation: %d (%d)\n", rc2, rc);
1461                         }
1462                 }
1463         }
1464         if (rc == 0)
1465                 mdd_links_add(env, child, pfid, lname, handle);
1466
1467         RETURN(rc);
1468 }
1469
1470 /* has not lock on pobj yet */
1471 static int mdd_create_sanity_check(const struct lu_env *env,
1472                                    struct md_object *pobj,
1473                                    const struct lu_name *lname,
1474                                    struct md_attr *ma,
1475                                    struct md_op_spec *spec)
1476 {
1477         struct mdd_thread_info *info = mdd_env_info(env);
1478         struct lu_attr    *la        = &info->mti_la;
1479         struct lu_fid     *fid       = &info->mti_fid;
1480         struct mdd_object *obj       = md2mdd_obj(pobj);
1481         struct mdd_device *m         = mdo2mdd(pobj);
1482         int lookup                   = spec->sp_cr_lookup;
1483         int rc;
1484         ENTRY;
1485
1486         /* EEXIST check */
1487         if (mdd_is_dead_obj(obj))
1488                 RETURN(-ENOENT);
1489
1490         /*
1491          * In some cases this lookup is not needed - we know before if name
1492          * exists or not because MDT performs lookup for it.
1493          * name length check is done in lookup.
1494          */
1495         if (lookup) {
1496                 /*
1497                  * Check if the name already exist, though it will be checked in
1498                  * _index_insert also, for avoiding rolling back if exists
1499                  * _index_insert.
1500                  */
1501                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1502                                          MAY_WRITE | MAY_EXEC);
1503                 if (rc != -ENOENT)
1504                         RETURN(rc ? : -EEXIST);
1505         } else {
1506                 /*
1507                  * Check WRITE permission for the parent.
1508                  * EXEC permission have been checked
1509                  * when lookup before create already.
1510                  */
1511                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1512                                                     MOR_TGT_PARENT);
1513                 if (rc)
1514                         RETURN(rc);
1515         }
1516
1517         /* sgid check */
1518         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1519         if (rc != 0)
1520                 RETURN(rc);
1521
1522         if (la->la_mode & S_ISGID) {
1523                 ma->ma_attr.la_gid = la->la_gid;
1524                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1525                         ma->ma_attr.la_mode |= S_ISGID;
1526                         ma->ma_attr.la_valid |= LA_MODE;
1527                 }
1528         }
1529
1530         switch (ma->ma_attr.la_mode & S_IFMT) {
1531         case S_IFLNK: {
1532                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1533
1534                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1535                         RETURN(-ENAMETOOLONG);
1536                 else
1537                         RETURN(0);
1538         }
1539         case S_IFDIR:
1540         case S_IFREG:
1541         case S_IFCHR:
1542         case S_IFBLK:
1543         case S_IFIFO:
1544         case S_IFSOCK:
1545                 rc = 0;
1546                 break;
1547         default:
1548                 rc = -EINVAL;
1549                 break;
1550         }
1551         RETURN(rc);
1552 }
1553
1554 /*
1555  * Create object and insert it into namespace.
1556  */
1557 static int mdd_create(const struct lu_env *env,
1558                       struct md_object *pobj,
1559                       const struct lu_name *lname,
1560                       struct md_object *child,
1561                       struct md_op_spec *spec,
1562                       struct md_attr* ma)
1563 {
1564         struct mdd_thread_info *info = mdd_env_info(env);
1565         struct lu_attr         *la = &info->mti_la_for_fix;
1566         struct md_attr         *ma_acl = &info->mti_ma;
1567         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1568         struct mdd_object      *son = md2mdd_obj(child);
1569         struct mdd_device      *mdd = mdo2mdd(pobj);
1570         struct lu_attr         *attr = &ma->ma_attr;
1571         struct lov_mds_md      *lmm = NULL;
1572         struct thandle         *handle;
1573         struct dynlock_handle  *dlh;
1574         const char             *name = lname->ln_name;
1575         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1576         int got_def_acl = 0;
1577 #ifdef HAVE_QUOTA_SUPPORT
1578         struct obd_device *obd = mdd->mdd_obd_dev;
1579         struct mds_obd *mds = &obd->u.mds;
1580         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1581         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1582         int quota_opc = 0, block_count = 0;
1583         int inode_pending = 0, block_pending = 0, parent_pending = 0;
1584 #endif
1585         ENTRY;
1586
1587         /*
1588          * Two operations have to be performed:
1589          *
1590          *  - an allocation of a new object (->do_create()), and
1591          *
1592          *  - an insertion into a parent index (->dio_insert()).
1593          *
1594          * Due to locking, operation order is not important, when both are
1595          * successful, *but* error handling cases are quite different:
1596          *
1597          *  - if insertion is done first, and following object creation fails,
1598          *  insertion has to be rolled back, but this operation might fail
1599          *  also leaving us with dangling index entry.
1600          *
1601          *  - if creation is done first, is has to be undone if insertion
1602          *  fails, leaving us with leaked space, which is neither good, nor
1603          *  fatal.
1604          *
1605          * It seems that creation-first is simplest solution, but it is
1606          * sub-optimal in the frequent
1607          *
1608          *         $ mkdir foo
1609          *         $ mkdir foo
1610          *
1611          * case, because second mkdir is bound to create object, only to
1612          * destroy it immediately.
1613          *
1614          * To avoid this follow local file systems that do double lookup:
1615          *
1616          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1617          *
1618          *     1. create            (mdd_object_create_internal())
1619          *
1620          *     2. insert            (__mdd_index_insert(), lookup again)
1621          */
1622
1623         /* Sanity checks before big job. */
1624         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1625         if (rc)
1626                 RETURN(rc);
1627
1628 #ifdef HAVE_QUOTA_SUPPORT
1629         if (mds->mds_quota) {
1630                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1631
1632                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1633                 if (!rc) {
1634                         int same = 0;
1635
1636                         quota_opc = FSFILT_OP_CREATE;
1637                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1638                         mdd_quota_wrapper(la_tmp, qpids);
1639                         /* get file quota for child */
1640                         lquota_chkquota(mds_quota_interface_ref, obd,
1641                                         qcids[USRQUOTA], qcids[GRPQUOTA], 1,
1642                                         &inode_pending, NULL, 0, NULL, 0);
1643                         switch (ma->ma_attr.la_mode & S_IFMT) {
1644                         case S_IFLNK:
1645                         case S_IFDIR:
1646                                 block_count = 2;
1647                                 break;
1648                         case S_IFREG:
1649                                 block_count = 1;
1650                                 break;
1651                         }
1652                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1653                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1654                                 block_count += 1;
1655                                 same = 1;
1656                         }
1657                         /* get block quota for child and parent */
1658                         if (block_count)
1659                                 lquota_chkquota(mds_quota_interface_ref, obd,
1660                                                 qcids[USRQUOTA], qcids[GRPQUOTA],
1661                                                 block_count,
1662                                                 &block_pending, NULL,
1663                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1664                         if (!same)
1665                                 lquota_chkquota(mds_quota_interface_ref, obd,
1666                                                 qpids[USRQUOTA], qpids[GRPQUOTA], 1,
1667                                                 &parent_pending, NULL,
1668                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1669                 }
1670         }
1671 #endif
1672
1673         /*
1674          * No RPC inside the transaction, so OST objects should be created at
1675          * first.
1676          */
1677         if (S_ISREG(attr->la_mode)) {
1678                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1679                                     spec, attr);
1680                 if (rc)
1681                         GOTO(out_pending, rc);
1682         }
1683
1684         if (!S_ISLNK(attr->la_mode)) {
1685                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
1686                 ma_acl->ma_acl = info->mti_xattr_buf;
1687                 ma_acl->ma_need = MA_ACL_DEF;
1688                 ma_acl->ma_valid = 0;
1689
1690                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
1691                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
1692                 mdd_read_unlock(env, mdd_pobj);
1693                 if (rc)
1694                         GOTO(out_free, rc);
1695                 else if (ma_acl->ma_valid & MA_ACL_DEF)
1696                         got_def_acl = 1;
1697         }
1698
1699         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1700         handle = mdd_trans_start(env, mdd);
1701         if (IS_ERR(handle))
1702                 GOTO(out_free, rc = PTR_ERR(handle));
1703
1704         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1705         if (dlh == NULL)
1706                 GOTO(out_trans, rc = -ENOMEM);
1707
1708         mdd_write_lock(env, son, MOR_TGT_CHILD);
1709         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
1710         if (rc) {
1711                 mdd_write_unlock(env, son);
1712                 GOTO(cleanup, rc);
1713         }
1714
1715         created = 1;
1716
1717 #ifdef CONFIG_FS_POSIX_ACL
1718         if (got_def_acl) {
1719                 struct lu_buf *acl_buf = &info->mti_buf;
1720                 acl_buf->lb_buf = ma_acl->ma_acl;
1721                 acl_buf->lb_len = ma_acl->ma_acl_size;
1722
1723                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
1724                 if (rc) {
1725                         mdd_write_unlock(env, son);
1726                         GOTO(cleanup, rc);
1727                 } else {
1728                         ma->ma_attr.la_valid |= LA_MODE;
1729                 }
1730         }
1731 #endif
1732
1733         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
1734                                    son, ma, handle, spec);
1735         mdd_write_unlock(env, son);
1736         if (rc)
1737                 /*
1738                  * Object has no links, so it will be destroyed when last
1739                  * reference is released. (XXX not now.)
1740                  */
1741                 GOTO(cleanup, rc);
1742
1743         initialized = 1;
1744
1745         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1746                                 name, S_ISDIR(attr->la_mode), handle,
1747                                 mdd_object_capa(env, mdd_pobj));
1748
1749         if (rc)
1750                 GOTO(cleanup, rc);
1751
1752         inserted = 1;
1753
1754         /* No need mdd_lsm_sanity_check here */
1755         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1756         if (rc) {
1757                 CERROR("error on stripe info copy %d \n", rc);
1758                 GOTO(cleanup, rc);
1759         }
1760         if (lmm && lmm_size > 0) {
1761                 /* Set Lov here, do not get lmm again later */
1762                 memcpy(ma->ma_lmm, lmm, lmm_size);
1763                 ma->ma_lmm_size = lmm_size;
1764                 ma->ma_valid |= MA_LOV;
1765         }
1766
1767         if (S_ISLNK(attr->la_mode)) {
1768                 struct md_ucred  *uc = md_ucred(env);
1769                 struct dt_object *dt = mdd_object_child(son);
1770                 const char *target_name = spec->u.sp_symname;
1771                 int sym_len = strlen(target_name);
1772                 const struct lu_buf *buf;
1773                 loff_t pos = 0;
1774
1775                 buf = mdd_buf_get_const(env, target_name, sym_len);
1776                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1777                                                 mdd_object_capa(env, son),
1778                                                 uc->mu_cap &
1779                                                 CFS_CAP_SYS_RESOURCE_MASK);
1780
1781                 if (rc == sym_len)
1782                         rc = 0;
1783                 else
1784                         GOTO(cleanup, rc = -EFAULT);
1785         }
1786
1787         *la = ma->ma_attr;
1788         la->la_valid = LA_CTIME | LA_MTIME;
1789         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1790         if (rc)
1791                 GOTO(cleanup, rc);
1792
1793         /* Return attr back. */
1794         rc = mdd_attr_get_internal_locked(env, son, ma);
1795         EXIT;
1796 cleanup:
1797         if (rc && created) {
1798                 int rc2 = 0;
1799
1800                 if (inserted) {
1801                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1802                                                  S_ISDIR(attr->la_mode),
1803                                                  handle, BYPASS_CAPA);
1804                         if (rc2)
1805                                 CERROR("error can not cleanup destroy %d\n",
1806                                        rc2);
1807                 }
1808
1809                 if (rc2 == 0) {
1810                         mdd_write_lock(env, son, MOR_TGT_CHILD);
1811                         __mdd_ref_del(env, son, handle, 0);
1812                         if (initialized && S_ISDIR(attr->la_mode))
1813                                 __mdd_ref_del(env, son, handle, 1);
1814                         mdd_write_unlock(env, son);
1815                 }
1816         }
1817
1818         /* update lov_objid data, must be before transaction stop! */
1819         if (rc == 0)
1820                 mdd_lov_objid_update(mdd, lmm);
1821
1822         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1823 out_trans:
1824         if (rc == 0)
1825                 rc = mdd_changelog_ns_store(env, mdd,
1826                             S_ISDIR(attr->la_mode) ? CL_MKDIR :
1827                             S_ISREG(attr->la_mode) ? CL_CREATE :
1828                             S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
1829                             son, mdd_pobj, NULL, lname, handle);
1830         mdd_trans_stop(env, mdd, rc, handle);
1831 out_free:
1832         /* finis lov_create stuff, free all temporary data */
1833         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1834 out_pending:
1835 #ifdef HAVE_QUOTA_SUPPORT
1836         if (quota_opc) {
1837                 if (inode_pending)
1838                         lquota_pending_commit(mds_quota_interface_ref, obd,
1839                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1840                                               inode_pending, 0);
1841                 if (block_pending)
1842                         lquota_pending_commit(mds_quota_interface_ref, obd,
1843                                               qcids[USRQUOTA], qcids[GRPQUOTA],
1844                                               block_pending, 1);
1845                 if (parent_pending)
1846                         lquota_pending_commit(mds_quota_interface_ref, obd,
1847                                               qpids[USRQUOTA], qpids[GRPQUOTA],
1848                                               parent_pending, 1);
1849                 /* Trigger dqacq on the owner of child and parent. If failed,
1850                  * the next call for lquota_chkquota will process it. */
1851                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1852                               quota_opc);
1853         }
1854 #endif
1855         return rc;
1856 }
1857
1858 /*
1859  * Get locks on parents in proper order
1860  * RETURN: < 0 - error, rename_order if successful
1861  */
1862 enum rename_order {
1863         MDD_RN_SAME,
1864         MDD_RN_SRCTGT,
1865         MDD_RN_TGTSRC
1866 };
1867
1868 static int mdd_rename_order(const struct lu_env *env,
1869                             struct mdd_device *mdd,
1870                             struct mdd_object *src_pobj,
1871                             struct mdd_object *tgt_pobj)
1872 {
1873         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1874         int rc;
1875         ENTRY;
1876
1877         if (src_pobj == tgt_pobj)
1878                 RETURN(MDD_RN_SAME);
1879
1880         /* compared the parent child relationship of src_p&tgt_p */
1881         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1882                 rc = MDD_RN_SRCTGT;
1883         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1884                 rc = MDD_RN_TGTSRC;
1885         } else {
1886                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1887                 if (rc == -EREMOTE)
1888                         rc = 0;
1889
1890                 if (rc == 1)
1891                         rc = MDD_RN_TGTSRC;
1892                 else if (rc == 0)
1893                         rc = MDD_RN_SRCTGT;
1894         }
1895
1896         RETURN(rc);
1897 }
1898
1899 /* has not mdd_write{read}_lock on any obj yet. */
1900 static int mdd_rename_sanity_check(const struct lu_env *env,
1901                                    struct mdd_object *src_pobj,
1902                                    struct mdd_object *tgt_pobj,
1903                                    struct mdd_object *sobj,
1904                                    struct mdd_object *tobj,
1905                                    struct md_attr *ma)
1906 {
1907         int rc = 0;
1908         ENTRY;
1909
1910         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1911                 RETURN(0);
1912
1913         /* XXX: when get here, sobj must NOT be NULL,
1914          * the other case has been processed in cml_rename
1915          * before mdd_rename and enable MDS_PERM_BYPASS. */
1916         LASSERT(sobj);
1917         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
1918         if (rc)
1919                 RETURN(rc);
1920
1921         /* XXX: when get here, "tobj == NULL" means tobj must
1922          * NOT exist (neither on remote MDS, such case has been
1923          * processed in cml_rename before mdd_rename and enable
1924          * MDS_PERM_BYPASS).
1925          * So check may_create, but not check may_unlink. */
1926         if (!tobj)
1927                 rc = mdd_may_create(env, tgt_pobj, NULL,
1928                                     (src_pobj != tgt_pobj), 0);
1929         else
1930                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
1931                                     (src_pobj != tgt_pobj), 1);
1932
1933         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
1934             S_ISDIR(ma->ma_attr.la_mode))
1935                 rc = __mdd_may_link(env, tgt_pobj);
1936
1937         RETURN(rc);
1938 }
1939
1940 /* src object can be remote that is why we use only fid and type of object */
1941 static int mdd_rename(const struct lu_env *env,
1942                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1943                       const struct lu_fid *lf, const struct lu_name *lsname,
1944                       struct md_object *tobj, const struct lu_name *ltname,
1945                       struct md_attr *ma)
1946 {
1947         const char *sname = lsname->ln_name;
1948         const char *tname = ltname->ln_name;
1949         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1950         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
1951         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1952         struct mdd_device *mdd = mdo2mdd(src_pobj);
1953         struct mdd_object *mdd_sobj = NULL;                  /* source object */
1954         struct mdd_object *mdd_tobj = NULL;
1955         struct dynlock_handle *sdlh, *tdlh;
1956         struct thandle *handle;
1957         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
1958         int is_dir;
1959         int rc;
1960
1961 #ifdef HAVE_QUOTA_SUPPORT
1962         struct obd_device *obd = mdd->mdd_obd_dev;
1963         struct mds_obd *mds = &obd->u.mds;
1964         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
1965         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
1966         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
1967         int quota_opc = 0, rec_pending = 0;
1968 #endif
1969         ENTRY;
1970
1971         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1972         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1973
1974         if (tobj)
1975                 mdd_tobj = md2mdd_obj(tobj);
1976
1977 #ifdef HAVE_QUOTA_SUPPORT
1978         if (mds->mds_quota) {
1979                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1980
1981                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
1982                 if (!rc) {
1983                         mdd_quota_wrapper(la_tmp, qspids);
1984                         if (!tobj) {
1985                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
1986                                                 BYPASS_CAPA);
1987                                 if (!rc) {
1988                                         quota_opc = FSFILT_OP_LINK;
1989                                         mdd_quota_wrapper(la_tmp, qtpids);
1990                                         /* get block quota for target parent */
1991                                         lquota_chkquota(mds_quota_interface_ref,
1992                                                         obd, qtpids[USRQUOTA],
1993                                                         qtpids[GRPQUOTA], 1,
1994                                                         &rec_pending, NULL,
1995                                                         LQUOTA_FLAGS_BLK,
1996                                                         NULL, 0);
1997                                 }
1998                         }
1999                 }
2000         }
2001 #endif
2002         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
2003         handle = mdd_trans_start(env, mdd);
2004         if (IS_ERR(handle))
2005                 GOTO(out_pending, rc = PTR_ERR(handle));
2006
2007         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2008         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2009         if (rc < 0)
2010                 GOTO(cleanup_unlocked, rc);
2011
2012         /* Get locks in determined order */
2013         if (rc == MDD_RN_SAME) {
2014                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2015                                           sname, MOR_SRC_PARENT);
2016                 /* check hashes to determine do we need one lock or two */
2017                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2018                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2019                                 MOR_TGT_PARENT);
2020                 else
2021                         tdlh = sdlh;
2022         } else if (rc == MDD_RN_SRCTGT) {
2023                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2024                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2025         } else {
2026                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2027                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2028         }
2029         if (sdlh == NULL || tdlh == NULL)
2030                 GOTO(cleanup, rc = -ENOMEM);
2031
2032         mdd_sobj = mdd_object_find(env, mdd, lf);
2033         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2034                                      mdd_sobj, mdd_tobj, ma);
2035         if (rc)
2036                 GOTO(cleanup, rc);
2037
2038         /* Remove source name from source directory */
2039         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2040                                 mdd_object_capa(env, mdd_spobj));
2041         if (rc)
2042                 GOTO(cleanup, rc);
2043
2044         /* "mv dir1 dir2" needs "dir1/.." link update */
2045         if (is_dir && mdd_sobj) {
2046                 rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
2047                                         mdd_object_capa(env, mdd_spobj));
2048                 if (rc)
2049                        GOTO(cleanup, rc);
2050
2051                 rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
2052                                         is_dir, handle,
2053                                         mdd_object_capa(env, mdd_tpobj));
2054                 if (rc)
2055                         GOTO(cleanup, rc);
2056         }
2057
2058         /* Remove target name from target directory
2059          * Here tobj can be remote one, so we do index_delete unconditionally
2060          * and -ENOENT is allowed.
2061          */
2062         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2063                                 mdd_object_capa(env, mdd_tpobj));
2064         if (rc != 0 && rc != -ENOENT)
2065                 GOTO(cleanup, rc);
2066
2067         /* Insert new fid with target name into target dir */
2068         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2069                                 mdd_object_capa(env, mdd_tpobj));
2070         if (rc)
2071                 GOTO(cleanup, rc);
2072
2073         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2074         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2075
2076         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2077         if (mdd_sobj) {
2078                 la->la_valid = LA_CTIME;
2079                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
2080                                                         handle, 0);
2081                 if (rc)
2082                         GOTO(cleanup, rc);
2083         }
2084
2085         /* Remove old target object
2086          * For tobj is remote case cmm layer has processed
2087          * and set tobj to NULL then. So when tobj is NOT NULL,
2088          * it must be local one.
2089          */
2090         if (tobj && mdd_object_exists(mdd_tobj)) {
2091                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2092                 __mdd_ref_del(env, mdd_tobj, handle, 0);
2093
2094                 /* Remove dot reference. */
2095                 if (is_dir)
2096                         __mdd_ref_del(env, mdd_tobj, handle, 1);
2097
2098                 la->la_valid = LA_CTIME;
2099                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2100                 if (rc)
2101                         GOTO(cleanup, rc);
2102
2103                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2104                 mdd_write_unlock(env, mdd_tobj);
2105                 if (rc)
2106                         GOTO(cleanup, rc);
2107
2108 #ifdef HAVE_QUOTA_SUPPORT
2109                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2110                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
2111                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2112                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
2113                 }
2114 #endif
2115         }
2116
2117         la->la_valid = LA_CTIME | LA_MTIME;
2118         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
2119         if (rc)
2120                 GOTO(cleanup, rc);
2121
2122         if (mdd_spobj != mdd_tpobj) {
2123                 la->la_valid = LA_CTIME | LA_MTIME;
2124                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
2125                                                   handle, 0);
2126         }
2127
2128         if (rc == 0 && mdd_sobj) {
2129                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2130                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2131                                       mdo2fid(mdd_tpobj), ltname, handle);
2132                 if (rc == -ENOENT)
2133                         /* Old files might not have EA entry */
2134                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2135                                       lsname, handle);
2136                 mdd_write_unlock(env, mdd_sobj);
2137                 /* We don't fail the transaction if the link ea can't be
2138                    updated -- fid2path will use alternate lookup method. */
2139                 rc = 0;
2140         }
2141
2142         EXIT;
2143 cleanup:
2144         if (likely(tdlh) && sdlh != tdlh)
2145                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2146         if (likely(sdlh))
2147                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2148 cleanup_unlocked:
2149         if (rc == 0)
2150                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj,
2151                                             mdd_spobj, lf, lsname, handle);
2152         if (rc == 0)
2153                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
2154                                             mdd_tpobj, lf, ltname, handle);
2155
2156         mdd_trans_stop(env, mdd, rc, handle);
2157         if (mdd_sobj)
2158                 mdd_object_put(env, mdd_sobj);
2159 out_pending:
2160 #ifdef HAVE_QUOTA_SUPPORT
2161         if (mds->mds_quota) {
2162                 if (rec_pending)
2163                         lquota_pending_commit(mds_quota_interface_ref, obd,
2164                                               qtpids[USRQUOTA],
2165                                               qtpids[GRPQUOTA],
2166                                               rec_pending, 1);
2167                 /* Trigger dqrel on the source owner of parent.
2168                  * If failed, the next call for lquota_chkquota will
2169                  * process it. */
2170                 lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2171                               FSFILT_OP_UNLINK_PARTIAL_PARENT);
2172                 if (quota_opc)
2173                         /* Trigger dqrel/dqacq on the target owner of child and
2174                          * parent. If failed, the next call for lquota_chkquota
2175                          * will process it. */
2176                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2177                                       qtpids, rc, quota_opc);
2178         }
2179 #endif
2180         return rc;
2181 }
2182
2183 /** enable/disable storing of hardlink info */
2184 int mdd_linkea_enable = 1;
2185 CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644,
2186                 "record hardlink info in EAs");
2187
2188 /** Read the link EA into a temp buffer.
2189  * Uses the name_buf since it is generally large.
2190  * \retval IS_ERR err
2191  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2192  */
2193 struct lu_buf *mdd_links_get(const struct lu_env *env,
2194                              struct mdd_object *mdd_obj)
2195 {
2196         struct lu_buf *buf;
2197         struct lustre_capa *capa;
2198         struct link_ea_header *leh;
2199         int rc;
2200
2201         /* First try a small buf */
2202         buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2203         if (buf->lb_buf == NULL)
2204                 return ERR_PTR(-ENOMEM);
2205
2206         capa = mdd_object_capa(env, mdd_obj);
2207         rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2208         if (rc == -ERANGE) {
2209                 /* Buf was too small, figure out what we need. */
2210                 buf->lb_buf = NULL;
2211                 buf->lb_len = 0;
2212                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2213                 if (rc < 0)
2214                         return ERR_PTR(rc);
2215                 buf = mdd_buf_alloc(env, rc);
2216                 if (buf->lb_buf == NULL)
2217                         return ERR_PTR(-ENOMEM);
2218                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2219         }
2220         if (rc < 0)
2221                 return ERR_PTR(rc);
2222
2223         leh = buf->lb_buf;
2224         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2225                 leh->leh_magic = LINK_EA_MAGIC;
2226                 leh->leh_reccount = __swab32(leh->leh_reccount);
2227                 leh->leh_len = __swab64(leh->leh_len);
2228                 /* entries are swabbed by mdd_lee_unpack */
2229         }
2230         if (leh->leh_magic != LINK_EA_MAGIC)
2231                 return ERR_PTR(-EINVAL);
2232
2233         return buf;
2234 }
2235
2236 /** Pack a link_ea_entry.
2237  * All elements are stored as chars to avoid alignment issues.
2238  * Numbers are always big-endian
2239  * \param packbuf is a temp fid buffer
2240  * \retval record length
2241  */
2242 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2243                          const struct lu_fid *pfid, struct lu_fid* packbuf)
2244 {
2245         char *ptr;
2246         int reclen;
2247
2248         fid_pack(&lee->lee_parent_fid, pfid, packbuf);
2249         ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len;
2250         strncpy(ptr, lname->ln_name, lname->ln_namelen);
2251         reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen +
2252                 sizeof(lee->lee_reclen);
2253         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2254         lee->lee_reclen[1] = reclen & 0xff;
2255         return reclen;
2256 }
2257
2258 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2259                     struct lu_name *lname, struct lu_fid *pfid)
2260 {
2261         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2262         fid_unpack(&lee->lee_parent_fid, pfid);
2263         lname->ln_name = (char *)&lee->lee_parent_fid +
2264                 lee->lee_parent_fid.fp_len;
2265         lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len -
2266                 sizeof(lee->lee_reclen);
2267 }
2268
2269 /** Add a record to the end of link ea buf */
2270 static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
2271                            const struct lu_fid *pfid,
2272                            const struct lu_name *lname)
2273 {
2274         struct link_ea_header *leh;
2275         struct link_ea_entry *lee;
2276         int reclen;
2277
2278         if (lname == NULL || pfid == NULL)
2279                 return -EINVAL;
2280
2281         /* Make sure our buf is big enough for the new one */
2282         leh = buf->lb_buf;
2283         reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2284         if (leh->leh_len + reclen > buf->lb_len) {
2285                 mdd_buf_grow(env, leh->leh_len + reclen);
2286                 if (buf->lb_buf == NULL)
2287                         return -ENOMEM;
2288         }
2289
2290         leh = buf->lb_buf;
2291         lee = buf->lb_buf + leh->leh_len;
2292         reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2);
2293         leh->leh_len += reclen;
2294         leh->leh_reccount++;
2295         return 0;
2296 }
2297
2298 /* For pathologic linkers, we don't want to spend lots of time scanning the
2299  * link ea.  Limit ourseleves to something reasonable; links not in the EA
2300  * can be looked up via (slower) parent lookup.
2301  */
2302 #define LINKEA_MAX_COUNT 128
2303
2304 static int mdd_links_add(const struct lu_env *env,
2305                          struct mdd_object *mdd_obj,
2306                          const struct lu_fid *pfid,
2307                          const struct lu_name *lname,
2308                          struct thandle *handle)
2309 {
2310         struct lu_buf *buf;
2311         struct link_ea_header *leh;
2312         int rc;
2313         ENTRY;
2314
2315         if (!mdd_linkea_enable)
2316                 RETURN(0);
2317
2318         buf = mdd_links_get(env, mdd_obj);
2319         if (IS_ERR(buf)) {
2320                 rc = PTR_ERR(buf);
2321                 if (rc != -ENODATA) {
2322                         CERROR("link_ea read failed %d "DFID"\n", rc,
2323                                PFID(mdd_object_fid(mdd_obj)));
2324                         RETURN (rc);
2325                 }
2326                 /* empty EA; start one */
2327                 buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2328                 if (buf->lb_buf == NULL)
2329                         RETURN(-ENOMEM);
2330                 leh = buf->lb_buf;
2331                 leh->leh_magic = LINK_EA_MAGIC;
2332                 leh->leh_len = sizeof(struct link_ea_header);
2333                 leh->leh_reccount = 0;
2334         }
2335
2336         leh = buf->lb_buf;
2337         if (leh->leh_reccount > LINKEA_MAX_COUNT)
2338                 RETURN(-EOVERFLOW);
2339
2340         rc = __mdd_links_add(env, buf, pfid, lname);
2341         if (rc)
2342                 RETURN(rc);
2343
2344         leh = buf->lb_buf;
2345         rc = __mdd_xattr_set(env, mdd_obj,
2346                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2347                              XATTR_NAME_LINK, 0, handle);
2348         if (rc)
2349                 CERROR("link_ea add failed %d "DFID"\n", rc,
2350                        PFID(mdd_object_fid(mdd_obj)));
2351
2352         if (buf->lb_vmalloc)
2353                 /* if we vmalloced a large buffer drop it */
2354                 mdd_buf_put(buf);
2355
2356         RETURN (rc);
2357 }
2358
2359 static int mdd_links_rename(const struct lu_env *env,
2360                             struct mdd_object *mdd_obj,
2361                             const struct lu_fid *oldpfid,
2362                             const struct lu_name *oldlname,
2363                             const struct lu_fid *newpfid,
2364                             const struct lu_name *newlname,
2365                             struct thandle *handle)
2366 {
2367         struct lu_buf  *buf;
2368         struct link_ea_header *leh;
2369         struct link_ea_entry  *lee;
2370         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
2371         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2372         int reclen = 0;
2373         int rc, count;
2374         ENTRY;
2375
2376         if (!mdd_linkea_enable)
2377                 RETURN(0);
2378
2379         if (mdd_obj->mod_flags & DEAD_OBJ)
2380                 /* No more links, don't bother */
2381                 RETURN(0);
2382
2383         buf = mdd_links_get(env, mdd_obj);
2384         if (IS_ERR(buf)) {
2385                 rc = PTR_ERR(buf);
2386                 CERROR("link_ea read failed %d "DFID"\n",
2387                        rc, PFID(mdd_object_fid(mdd_obj)));
2388                 RETURN(rc);
2389         }
2390         leh = buf->lb_buf;
2391         lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
2392
2393         /* Find the old record */
2394         for(count = 0; count <= leh->leh_reccount; count++) {
2395                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
2396                 if (tmpname->ln_namelen == oldlname->ln_namelen &&
2397                     lu_fid_eq(tmpfid, oldpfid) &&
2398                     (strncmp(tmpname->ln_name, oldlname->ln_name,
2399                              tmpname->ln_namelen) == 0))
2400                         break;
2401                 lee = (struct link_ea_entry *)((char *)lee + reclen);
2402         }
2403         if (count > leh->leh_reccount) {
2404                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2405                        oldlname->ln_namelen, oldlname->ln_name);
2406                 GOTO(out, rc = -ENOENT);
2407         }
2408
2409         /* Remove the old record */
2410         leh->leh_reccount--;
2411         leh->leh_len -= reclen;
2412         memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len -
2413                 (char *)lee);
2414
2415         /* If renaming, add the new record */
2416         if (newpfid != NULL) {
2417                 rc = __mdd_links_add(env, buf, newpfid, newlname);
2418                 if (rc)
2419                         GOTO(out, rc);
2420                 leh = buf->lb_buf;
2421         }
2422
2423         rc = __mdd_xattr_set(env, mdd_obj,
2424                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2425                              XATTR_NAME_LINK, 0, handle);
2426
2427 out:
2428         if (rc)
2429                 CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n",
2430                        oldlname->ln_namelen, oldlname->ln_name, rc,
2431                        PFID(mdd_object_fid(mdd_obj)));
2432
2433         if (buf->lb_vmalloc)
2434                 /* if we vmalloced a large buffer drop it */
2435                 mdd_buf_put(buf);
2436
2437         RETURN (rc);
2438 }
2439
2440 const struct md_dir_operations mdd_dir_ops = {
2441         .mdo_is_subdir     = mdd_is_subdir,
2442         .mdo_lookup        = mdd_lookup,
2443         .mdo_create        = mdd_create,
2444         .mdo_rename        = mdd_rename,
2445         .mdo_link          = mdd_link,
2446         .mdo_unlink        = mdd_unlink,
2447         .mdo_name_insert   = mdd_name_insert,
2448         .mdo_name_remove   = mdd_name_remove,
2449         .mdo_rename_tgt    = mdd_rename_tgt,
2450         .mdo_create_data   = mdd_create_data
2451 };