Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_ver.h>
57 #include <obd_support.h>
58 #include <lprocfs_status.h>
59 #ifdef HAVE_EXT4_LDISKFS
60 #include <ldiskfs/ldiskfs.h>
61 #else
62 #include <linux/ldiskfs_fs.h>
63 #endif
64 #include <lustre_mds.h>
65 #include <lustre/lustre_idl.h>
66 #include <lustre_fid.h>
67
68 #include "mdd_internal.h"
69
70 static const char dot[] = ".";
71 static const char dotdot[] = "..";
72
73 static struct lu_name lname_dotdot = {
74         (char *) dotdot,
75         sizeof(dotdot) - 1
76 };
77
78 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
79                         const struct lu_name *lname, struct lu_fid* fid,
80                         int mask);
81 static int mdd_declare_links_add(const struct lu_env *env,
82                                  struct mdd_object *mdd_obj,
83                                  struct thandle *handle);
84 static int mdd_links_add(const struct lu_env *env,
85                          struct mdd_object *mdd_obj,
86                          const struct lu_fid *pfid,
87                          const struct lu_name *lname,
88                          struct thandle *handle, int first);
89 static int mdd_links_rename(const struct lu_env *env,
90                             struct mdd_object *mdd_obj,
91                             const struct lu_fid *oldpfid,
92                             const struct lu_name *oldlname,
93                             const struct lu_fid *newpfid,
94                             const struct lu_name *newlname,
95                             struct thandle *handle);
96
97 static int
98 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
99                     const struct lu_name *lname, struct lu_fid* fid, int mask)
100 {
101         const char *name = lname->ln_name;
102         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
103         struct dynlock_handle *dlh;
104         int rc;
105
106         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
107         if (unlikely(dlh == NULL))
108                 return -ENOMEM;
109         rc = __mdd_lookup(env, pobj, lname, fid, mask);
110         mdd_pdo_read_unlock(env, mdd_obj, dlh);
111
112         return rc;
113 }
114
115 int mdd_lookup(const struct lu_env *env,
116                struct md_object *pobj, const struct lu_name *lname,
117                struct lu_fid* fid, struct md_op_spec *spec)
118 {
119         int rc;
120         ENTRY;
121         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
122         RETURN(rc);
123 }
124
125 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
126                           struct lu_fid *fid)
127 {
128         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
129 }
130
131 /*
132  * For root fid use special function, which does not compare version component
133  * of fid. Version component is different for root fids on all MDTs.
134  */
135 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
136 {
137         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
138                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
139 }
140
141 /*
142  * return 1: if lf is the fid of the ancestor of p1;
143  * return 0: if not;
144  *
145  * return -EREMOTE: if remote object is found, in this
146  * case fid of remote object is saved to @pf;
147  *
148  * otherwise: values < 0, errors.
149  */
150 static int mdd_is_parent(const struct lu_env *env,
151                          struct mdd_device *mdd,
152                          struct mdd_object *p1,
153                          const struct lu_fid *lf,
154                          struct lu_fid *pf)
155 {
156         struct mdd_object *parent = NULL;
157         struct lu_fid *pfid;
158         int rc;
159         ENTRY;
160
161         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
162         pfid = &mdd_env_info(env)->mti_fid;
163
164         /* Check for root first. */
165         if (mdd_is_root(mdd, mdo2fid(p1)))
166                 RETURN(0);
167
168         for(;;) {
169                 /* this is done recursively, bypass capa for each obj */
170                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
171                 rc = mdd_parent_fid(env, p1, pfid);
172                 if (rc)
173                         GOTO(out, rc);
174                 if (mdd_is_root(mdd, pfid))
175                         GOTO(out, rc = 0);
176                 if (lu_fid_eq(pfid, lf))
177                         GOTO(out, rc = 1);
178                 if (parent)
179                         mdd_object_put(env, parent);
180                 parent = mdd_object_find(env, mdd, pfid);
181
182                 /* cross-ref parent */
183                 if (parent == NULL) {
184                         if (pf != NULL)
185                                 *pf = *pfid;
186                         GOTO(out, rc = -EREMOTE);
187                 } else if (IS_ERR(parent))
188                         GOTO(out, rc = PTR_ERR(parent));
189                 p1 = parent;
190         }
191         EXIT;
192 out:
193         if (parent && !IS_ERR(parent))
194                 mdd_object_put(env, parent);
195         return rc;
196 }
197
198 /*
199  * No permission check is needed.
200  *
201  * returns 1: if fid is ancestor of @mo;
202  * returns 0: if fid is not a ancestor of @mo;
203  *
204  * returns EREMOTE if remote object is found, fid of remote object is saved to
205  * @fid;
206  *
207  * returns < 0: if error
208  */
209 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
210                   const struct lu_fid *fid, struct lu_fid *sfid)
211 {
212         struct mdd_device *mdd = mdo2mdd(mo);
213         int rc;
214         ENTRY;
215
216         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
217                 RETURN(0);
218
219         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
220         if (rc == 0) {
221                 /* found root */
222                 fid_zero(sfid);
223         } else if (rc == 1) {
224                 /* found @fid is parent */
225                 *sfid = *fid;
226                 rc = 0;
227         }
228         RETURN(rc);
229 }
230
231 /*
232  * Check that @dir contains no entries except (possibly) dot and dotdot.
233  *
234  * Returns:
235  *
236  *             0        empty
237  *      -ENOTDIR        not a directory object
238  *    -ENOTEMPTY        not empty
239  *           -ve        other error
240  *
241  */
242 static int mdd_dir_is_empty(const struct lu_env *env,
243                             struct mdd_object *dir)
244 {
245         struct dt_it     *it;
246         struct dt_object *obj;
247         const struct dt_it_ops *iops;
248         int result;
249         ENTRY;
250
251         obj = mdd_object_child(dir);
252         if (!dt_try_as_dir(env, obj))
253                 RETURN(-ENOTDIR);
254
255         iops = &obj->do_index_ops->dio_it;
256         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
257         if (!IS_ERR(it)) {
258                 result = iops->get(env, it, (const void *)"");
259                 if (result > 0) {
260                         int i;
261                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
262                                 result = iops->next(env, it);
263                         if (result == 0)
264                                 result = -ENOTEMPTY;
265                         else if (result == +1)
266                                 result = 0;
267                 } else if (result == 0)
268                         /*
269                          * Huh? Index contains no zero key?
270                          */
271                         result = -EIO;
272
273                 iops->put(env, it);
274                 iops->fini(env, it);
275         } else
276                 result = PTR_ERR(it);
277         RETURN(result);
278 }
279
280 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
281 {
282         struct mdd_device *m = mdd_obj2mdd_dev(obj);
283         struct lu_attr *la = &mdd_env_info(env)->mti_la;
284         int rc;
285         ENTRY;
286
287         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
288         if (rc)
289                 RETURN(rc);
290
291         /*
292          * Subdir count limitation can be broken through.
293          */
294         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
295             !S_ISDIR(la->la_mode))
296                 RETURN(-EMLINK);
297         else
298                 RETURN(0);
299 }
300
301 /*
302  * Check whether it may create the cobj under the pobj.
303  * cobj maybe NULL
304  */
305 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
306                    struct mdd_object *cobj, int check_perm, int check_nlink)
307 {
308         int rc = 0;
309         ENTRY;
310
311         if (cobj && mdd_object_exists(cobj))
312                 RETURN(-EEXIST);
313
314         if (mdd_is_dead_obj(pobj))
315                 RETURN(-ENOENT);
316
317         if (check_perm)
318                 rc = mdd_permission_internal_locked(env, pobj, NULL,
319                                                     MAY_WRITE | MAY_EXEC,
320                                                     MOR_TGT_PARENT);
321
322         if (!rc && check_nlink)
323                 rc = __mdd_may_link(env, pobj);
324
325         RETURN(rc);
326 }
327
328 /*
329  * Check whether can unlink from the pobj in the case of "cobj == NULL".
330  */
331 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
332                    const struct md_attr *ma)
333 {
334         int rc;
335         ENTRY;
336
337         if (mdd_is_dead_obj(pobj))
338                 RETURN(-ENOENT);
339
340         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
341             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
342                 RETURN(-EPERM);
343
344         rc = mdd_permission_internal_locked(env, pobj, NULL,
345                                             MAY_WRITE | MAY_EXEC,
346                                             MOR_TGT_PARENT);
347         if (rc)
348                 RETURN(rc);
349
350         if (mdd_is_append(pobj))
351                 RETURN(-EPERM);
352
353         RETURN(rc);
354 }
355
356 /*
357  * pobj == NULL is remote ops case, under such case, pobj's
358  * VTX feature has been checked already, no need check again.
359  */
360 static inline int mdd_is_sticky(const struct lu_env *env,
361                                 struct mdd_object *pobj,
362                                 struct mdd_object *cobj)
363 {
364         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
365         struct md_ucred *uc = md_ucred(env);
366         int rc;
367
368         if (pobj) {
369                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
370                 if (rc)
371                         return rc;
372
373                 if (!(tmp_la->la_mode & S_ISVTX) ||
374                      (tmp_la->la_uid == uc->mu_fsuid))
375                         return 0;
376         }
377
378         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
379         if (rc)
380                 return rc;
381
382         if (tmp_la->la_uid == uc->mu_fsuid)
383                 return 0;
384
385         return !mdd_capable(uc, CFS_CAP_FOWNER);
386 }
387
388 /*
389  * Check whether it may delete the cobj from the pobj.
390  * pobj maybe NULL
391  */
392 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
393                    struct mdd_object *cobj, struct md_attr *ma,
394                    int check_perm, int check_empty)
395 {
396         int rc = 0;
397         ENTRY;
398
399         LASSERT(cobj);
400         if (!mdd_object_exists(cobj))
401                 RETURN(-ENOENT);
402
403         if (mdd_is_dead_obj(cobj))
404                 RETURN(-ESTALE);
405
406         if (pobj) {
407                 if (!mdd_object_exists(pobj))
408                         RETURN(-ENOENT);
409
410                 if (mdd_is_dead_obj(pobj))
411                         RETURN(-ENOENT);
412
413                 if (check_perm) {
414                         rc = mdd_permission_internal_locked(env, pobj, NULL,
415                                                     MAY_WRITE | MAY_EXEC,
416                                                     MOR_TGT_PARENT);
417                         if (rc)
418                                 RETURN(rc);
419                 }
420
421                 if (mdd_is_append(pobj))
422                         RETURN(-EPERM);
423         }
424
425         if (!(ma->ma_attr_flags & MDS_VTX_BYPASS) &&
426             mdd_is_sticky(env, pobj, cobj))
427                 RETURN(-EPERM);
428
429         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
430                 RETURN(-EPERM);
431
432         if ((ma->ma_attr.la_valid & LA_FLAGS) &&
433             (ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
434                 RETURN(-EPERM);
435
436         if (S_ISDIR(ma->ma_attr.la_mode)) {
437                 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
438
439                 if (!S_ISDIR(mdd_object_type(cobj)))
440                         RETURN(-ENOTDIR);
441
442                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
443                         RETURN(-EBUSY);
444         } else if (S_ISDIR(mdd_object_type(cobj)))
445                 RETURN(-EISDIR);
446
447         if (S_ISDIR(ma->ma_attr.la_mode) && check_empty)
448                 rc = mdd_dir_is_empty(env, cobj);
449
450         RETURN(rc);
451 }
452
453 /*
454  * tgt maybe NULL
455  * has mdd_write_lock on src already, but not on tgt yet
456  */
457 int mdd_link_sanity_check(const struct lu_env *env,
458                           struct mdd_object *tgt_obj,
459                           const struct lu_name *lname,
460                           struct mdd_object *src_obj)
461 {
462         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
463         int rc = 0;
464         ENTRY;
465
466         if (!mdd_object_exists(src_obj))
467                 RETURN(-ENOENT);
468
469         if (mdd_is_dead_obj(src_obj))
470                 RETURN(-ESTALE);
471
472         /* Local ops, no lookup before link, check filename length here. */
473         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
474                 RETURN(-ENAMETOOLONG);
475
476         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
477                 RETURN(-EPERM);
478
479         if (S_ISDIR(mdd_object_type(src_obj)))
480                 RETURN(-EPERM);
481
482         LASSERT(src_obj != tgt_obj);
483         if (tgt_obj) {
484                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
485                 if (rc)
486                         RETURN(rc);
487         }
488
489         rc = __mdd_may_link(env, src_obj);
490
491         RETURN(rc);
492 }
493
494 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
495                                    const char *name, struct thandle *handle,
496                                    struct lustre_capa *capa)
497 {
498         struct dt_object *next = mdd_object_child(pobj);
499         int               rc;
500         ENTRY;
501
502         if (dt_try_as_dir(env, next)) {
503                 rc = next->do_index_ops->dio_delete(env, next,
504                                                     (struct dt_key *)name,
505                                                     handle, capa);
506         } else
507                 rc = -ENOTDIR;
508
509         RETURN(rc);
510 }
511
512 static int __mdd_index_insert_only(const struct lu_env *env,
513                                    struct mdd_object *pobj,
514                                    const struct lu_fid *lf, const char *name,
515                                    struct thandle *handle,
516                                    struct lustre_capa *capa)
517 {
518         struct dt_object *next = mdd_object_child(pobj);
519         int               rc;
520         ENTRY;
521
522         if (dt_try_as_dir(env, next)) {
523                 struct md_ucred  *uc = md_ucred(env);
524
525                 rc = next->do_index_ops->dio_insert(env, next,
526                                                     (struct dt_rec*)lf,
527                                                     (const struct dt_key *)name,
528                                                     handle, capa, uc->mu_cap &
529                                                     CFS_CAP_SYS_RESOURCE_MASK);
530         } else {
531                 rc = -ENOTDIR;
532         }
533         RETURN(rc);
534 }
535
536 /* insert named index, add reference if isdir */
537 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
538                               const struct lu_fid *lf, const char *name, int is_dir,
539                               struct thandle *handle, struct lustre_capa *capa)
540 {
541         int               rc;
542         ENTRY;
543
544         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
545         if (rc == 0 && is_dir) {
546                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
547                 mdo_ref_add(env, pobj, handle);
548                 mdd_write_unlock(env, pobj);
549         }
550         RETURN(rc);
551 }
552
553 /* delete named index, drop reference if isdir */
554 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
555                               const char *name, int is_dir, struct thandle *handle,
556                               struct lustre_capa *capa)
557 {
558         int               rc;
559         ENTRY;
560
561         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
562         if (rc == 0 && is_dir) {
563                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
564                 mdo_ref_del(env, pobj, handle);
565                 mdd_write_unlock(env, pobj);
566         }
567
568         RETURN(rc);
569 }
570
571 int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
572                             int reclen, struct thandle *handle)
573 {
574         int rc;
575
576         /* XXX: this is a temporary solution to declare llog changes
577          *      will be fixed in 2.3 with new llog implementation */
578
579         LASSERT(mdd->mdd_capa);
580
581         /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
582          *      have to set the parameter 'size' as INT_MAX or 0 to inform
583          *      OSD that this record write is for a llog write or catalog
584          *      header update, and osd declare function will reserve less
585          *      credits for optimization purpose.
586          *
587          *      Reserve 6 blocks for a llog write, since the llog file is
588          *      usually small, reserve 2 blocks for catalog header update,
589          *      because we know for sure that catalog header is already
590          *      allocated.
591          *
592          *      This hack should be removed in 2.3.
593          */
594
595         /* record itself */
596         rc = dt_declare_record_write(env, mdd->mdd_capa,
597                                      DECLARE_LLOG_WRITE, 0, handle);
598         if (rc)
599                 return rc;
600
601         /* header will be updated as well */
602         rc = dt_declare_record_write(env, mdd->mdd_capa,
603                                      DECLARE_LLOG_WRITE, 0, handle);
604         if (rc)
605                 return rc;
606
607         /* also we should be able to create new plain log */
608         rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
609         if (rc)
610                 return rc;
611
612         /* new record referencing new plain llog */
613         rc = dt_declare_record_write(env, mdd->mdd_capa,
614                                      DECLARE_LLOG_WRITE, 0, handle);
615         if (rc)
616                 return rc;
617
618         /* catalog's header will be updated as well */
619         rc = dt_declare_record_write(env, mdd->mdd_capa,
620                                      DECLARE_LLOG_REWRITE, 0, handle);
621
622         return rc;
623 }
624
625 int mdd_declare_changelog_store(const struct lu_env *env,
626                                 struct mdd_device *mdd,
627                                 const struct lu_name *fname,
628                                 struct thandle *handle)
629 {
630         int reclen;
631
632         /* Not recording */
633         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
634                 return 0;
635
636         /* we'll be writing payload + llog header */
637         reclen = sizeof(struct llog_changelog_rec);
638         if (fname)
639                 reclen += llog_data_len(fname->ln_namelen);
640
641         return mdd_declare_llog_record(env, mdd, reclen, handle);
642 }
643
644 /** Store a namespace change changelog record
645  * If this fails, we must fail the whole transaction; we don't
646  * want the change to commit without the log entry.
647  * \param target - mdd_object of change
648  * \param parent - parent dir/object
649  * \param tf - target lu_fid, overrides fid of \a target if this is non-null
650  * \param tname - target name string
651  * \param handle - transacion handle
652  */
653 static int mdd_changelog_ns_store(const struct lu_env  *env,
654                                   struct mdd_device    *mdd,
655                                   enum changelog_rec_type type,
656                                   int flags,
657                                   struct mdd_object    *target,
658                                   struct mdd_object    *parent,
659                                   const struct lu_fid  *tf,
660                                   const struct lu_name *tname,
661                                   struct thandle *handle)
662 {
663         const struct lu_fid *tfid;
664         const struct lu_fid *tpfid = mdo2fid(parent);
665         struct llog_changelog_rec *rec;
666         struct lu_buf *buf;
667         int reclen;
668         int rc;
669         ENTRY;
670
671         /* Not recording */
672         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
673                 RETURN(0);
674         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
675                 RETURN(0);
676
677         LASSERT(parent != NULL);
678         LASSERT(tname != NULL);
679         LASSERT(handle != NULL);
680
681         /* target */
682         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
683         buf = mdd_buf_alloc(env, reclen);
684         if (buf->lb_buf == NULL)
685                 RETURN(-ENOMEM);
686         rec = (struct llog_changelog_rec *)buf->lb_buf;
687
688         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
689         rec->cr.cr_type = (__u32)type;
690         tfid = tf ? tf : mdo2fid(target);
691         rec->cr.cr_tfid = *tfid;
692         rec->cr.cr_pfid = *tpfid;
693         rec->cr.cr_namelen = tname->ln_namelen;
694         memcpy(rec->cr.cr_name, tname->ln_name, rec->cr.cr_namelen);
695         if (likely(target))
696                 target->mod_cltime = cfs_time_current_64();
697
698         rc = mdd_changelog_llog_write(mdd, rec, handle);
699         if (rc < 0) {
700                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
701                        rc, type, tname->ln_name, PFID(tfid), PFID(tpfid));
702                 return -EFAULT;
703         }
704
705         return 0;
706 }
707
708 static int mdd_declare_link(const struct lu_env *env,
709                             struct mdd_device *mdd,
710                             struct mdd_object *p,
711                             struct mdd_object *c,
712                             const struct lu_name *name,
713                             struct thandle *handle)
714 {
715         int rc;
716
717         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
718         if (rc)
719                 return rc;
720
721         rc = mdo_declare_ref_add(env, c, handle);
722         if (rc)
723                 return rc;
724
725         rc = mdo_declare_attr_set(env, p, NULL, handle);
726         if (rc)
727                 return rc;
728
729         rc = mdo_declare_attr_set(env, c, NULL, handle);
730         if (rc)
731                 return rc;
732
733         rc = mdd_declare_links_add(env, c, handle);
734         if (rc)
735                 return rc;
736
737         rc = mdd_declare_changelog_store(env, mdd, name, handle);
738
739         return rc;
740 }
741
742 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
743                     struct md_object *src_obj, const struct lu_name *lname,
744                     struct md_attr *ma)
745 {
746         const char *name = lname->ln_name;
747         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
748         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
749         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
750         struct mdd_device *mdd = mdo2mdd(src_obj);
751         struct dynlock_handle *dlh;
752         struct thandle *handle;
753 #ifdef HAVE_QUOTA_SUPPORT
754         struct obd_device *obd = mdd->mdd_obd_dev;
755         struct obd_export *exp = md_quota(env)->mq_exp;
756         struct mds_obd *mds = &obd->u.mds;
757         unsigned int qids[MAXQUOTAS] = { 0, 0 };
758         int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
759 #endif
760         int rc;
761         ENTRY;
762
763 #ifdef HAVE_QUOTA_SUPPORT
764         if (mds->mds_quota) {
765                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
766
767                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
768                 if (!rc) {
769                         void *data = NULL;
770                         mdd_data_get(env, mdd_tobj, &data);
771                         quota_opc = FSFILT_OP_LINK;
772                         mdd_quota_wrapper(la_tmp, qids);
773                         /* get block quota for parent */
774                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
775                                         qids, rec_pending, 1, NULL,
776                                         LQUOTA_FLAGS_BLK, data, 1);
777                 }
778         }
779 #endif
780
781         handle = mdd_trans_create(env, mdd);
782         if (IS_ERR(handle))
783                 GOTO(out_pending, rc = PTR_ERR(handle));
784
785         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
786         if (rc)
787                 GOTO(stop, rc);
788
789         rc = mdd_trans_start(env, mdd, handle);
790         if (rc)
791                 GOTO(stop, rc);
792
793         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
794         if (dlh == NULL)
795                 GOTO(out_trans, rc = -ENOMEM);
796         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
797
798         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
799         if (rc)
800                 GOTO(out_unlock, rc);
801
802         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
803                                      name, handle,
804                                      mdd_object_capa(env, mdd_tobj));
805         if (rc)
806                 GOTO(out_unlock, rc);
807
808         mdo_ref_add(env, mdd_sobj, handle);
809
810         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
811         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
812
813         la->la_valid = LA_CTIME | LA_MTIME;
814         rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
815         if (rc)
816                 GOTO(out_unlock, rc);
817
818         la->la_valid = LA_CTIME;
819         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
820         if (rc == 0) {
821                 mdd_links_add(env, mdd_sobj,
822                               mdo2fid(mdd_tobj), lname, handle, 0);
823         }
824
825         EXIT;
826 out_unlock:
827         mdd_write_unlock(env, mdd_sobj);
828         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
829 out_trans:
830         if (rc == 0)
831                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
832                                             mdd_tobj, NULL, lname, handle);
833 stop:
834         mdd_trans_stop(env, mdd, rc, handle);
835 out_pending:
836 #ifdef HAVE_QUOTA_SUPPORT
837         if (quota_opc) {
838                 lquota_pending_commit(mds_quota_interface_ref, obd,
839                                       qids, rec_pending, 1);
840                 /* Trigger dqacq for the parent owner. If failed,
841                  * the next call for lquota_chkquota will process it. */
842                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
843                               quota_opc);
844         }
845 #endif
846         return rc;
847 }
848
849 int mdd_declare_finish_unlink(const struct lu_env *env,
850                               struct mdd_object *obj,
851                               struct md_attr *ma,
852                               struct thandle *handle)
853 {
854         int rc;
855
856         rc = orph_declare_index_insert(env, obj, handle);
857         if (rc)
858                 return rc;
859
860         return mdd_declare_object_kill(env, obj, ma, handle);
861 }
862
863 /* caller should take a lock before calling */
864 int mdd_finish_unlink(const struct lu_env *env,
865                       struct mdd_object *obj, struct md_attr *ma,
866                       struct thandle *th)
867 {
868         int rc;
869         int reset = 1;
870         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
871         ENTRY;
872
873         LASSERT(mdd_write_locked(env, obj) != 0);
874
875         /* read HSM flags, needed to set changelogs flags */
876         ma->ma_need = MA_HSM | MA_INODE;
877         rc = mdd_attr_get_internal(env, obj, ma);
878         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
879                 obj->mod_flags |= DEAD_OBJ;
880                 /* add new orphan and the object
881                  * will be deleted during mdd_close() */
882                 if (obj->mod_count) {
883                         rc = __mdd_orphan_add(env, obj, th);
884                         if (rc == 0)
885                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
886                                         "orphan list, open count = %d\n",
887                                         PFID(mdd_object_fid(obj)),
888                                         obj->mod_count);
889                         else
890                                 CERROR("Object "DFID" fail to be an orphan, "
891                                        "open count = %d, maybe cause failed "
892                                        "open replay\n",
893                                         PFID(mdd_object_fid(obj)),
894                                         obj->mod_count);
895                 } else {
896                         rc = mdd_object_kill(env, obj, ma, th);
897                         if (rc == 0)
898                                 reset = 0;
899                 }
900
901                 /* get the i_nlink */
902                 ma->ma_need = MA_INODE;
903                 rc = mdd_attr_get_internal(env, obj, ma);
904         }
905         if (reset)
906                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
907
908         RETURN(rc);
909 }
910
911 /*
912  * pobj maybe NULL
913  * has mdd_write_lock on cobj already, but not on pobj yet
914  */
915 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
916                             struct mdd_object *cobj, struct md_attr *ma)
917 {
918         int rc;
919         ENTRY;
920
921         rc = mdd_may_delete(env, pobj, cobj, ma, 1, 1);
922
923         RETURN(rc);
924 }
925
926 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
927                               struct mdd_object *p, struct mdd_object *c,
928                               const struct lu_name *name, struct md_attr *ma,
929                               struct thandle *handle)
930 {
931         int rc;
932
933         rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
934         if (rc)
935                 return rc;
936
937         rc = mdo_declare_ref_del(env, p, handle);
938         if (rc)
939                 return rc;
940
941         rc = mdo_declare_ref_del(env, c, handle);
942         if (rc)
943                 return rc;
944
945         rc = mdo_declare_ref_del(env, c, handle);
946         if (rc)
947                 return rc;
948
949         rc = mdo_declare_attr_set(env, p, NULL, handle);
950         if (rc)
951                 return rc;
952
953         rc = mdo_declare_attr_set(env, c, NULL, handle);
954         if (rc)
955                 return rc;
956
957         rc = mdd_declare_finish_unlink(env, c, ma, handle);
958         if (rc)
959                 return rc;
960
961         rc = mdd_declare_links_add(env, c, handle);
962         if (rc)
963                 return rc;
964
965         rc = mdd_declare_changelog_store(env, mdd, name, handle);
966
967         return rc;
968 }
969
970 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
971                       struct md_object *cobj, const struct lu_name *lname,
972                       struct md_attr *ma)
973 {
974         const char *name = lname->ln_name;
975         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
976         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
977         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
978         struct mdd_device *mdd = mdo2mdd(pobj);
979         struct dynlock_handle *dlh;
980         struct thandle    *handle;
981 #ifdef HAVE_QUOTA_SUPPORT
982         struct obd_device *obd = mdd->mdd_obd_dev;
983         struct mds_obd *mds = &obd->u.mds;
984         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
985         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
986         int quota_opc = 0;
987 #endif
988         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
989         int rc;
990         ENTRY;
991
992         if (mdd_object_exists(mdd_cobj) <= 0)
993                 RETURN(-ENOENT);
994
995         handle = mdd_trans_create(env, mdd);
996         if (IS_ERR(handle))
997                 RETURN(PTR_ERR(handle));
998
999         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1000                                 lname, ma, handle);
1001         if (rc)
1002                 GOTO(stop, rc);
1003
1004         rc = mdd_trans_start(env, mdd, handle);
1005         if (rc)
1006                 GOTO(stop, rc);
1007
1008         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1009         if (dlh == NULL)
1010                 GOTO(out_trans, rc = -ENOMEM);
1011         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1012
1013         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
1014         if (rc)
1015                 GOTO(cleanup, rc);
1016
1017         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1018                                 mdd_object_capa(env, mdd_pobj));
1019         if (rc)
1020                 GOTO(cleanup, rc);
1021
1022         mdo_ref_del(env, mdd_cobj, handle);
1023         if (is_dir)
1024                 /* unlink dot */
1025                 mdo_ref_del(env, mdd_cobj, handle);
1026
1027         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1028         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1029
1030         la->la_valid = LA_CTIME | LA_MTIME;
1031         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
1032         if (rc)
1033                 GOTO(cleanup, rc);
1034
1035         if (ma->ma_attr.la_nlink > 0 || mdd_cobj->mod_count > 0) {
1036                 /* update ctime of an unlinked file only if it is still
1037                  * opened or a link still exists */
1038                 la->la_valid = LA_CTIME;
1039                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1040                 if (rc)
1041                         GOTO(cleanup, rc);
1042         }
1043
1044         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1045 #ifdef HAVE_QUOTA_SUPPORT
1046         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1047             ma->ma_attr.la_nlink == 0) {
1048                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1049
1050                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1051                 if (!rc) {
1052                         mdd_quota_wrapper(la_tmp, qpids);
1053                         if (mdd_cobj->mod_count == 0) {
1054                                 quota_opc = FSFILT_OP_UNLINK;
1055                                 mdd_quota_wrapper(&ma->ma_attr, qcids);
1056                         } else {
1057                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
1058                         }
1059                 }
1060         }
1061 #endif
1062         if (!is_dir)
1063                 /* old files may not have link ea; ignore errors */
1064                 mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj),
1065                                  lname, NULL, NULL, handle);
1066
1067         EXIT;
1068 cleanup:
1069         mdd_write_unlock(env, mdd_cobj);
1070         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1071 out_trans:
1072         if (rc == 0) {
1073                 int cl_flags;
1074
1075                 cl_flags = (ma->ma_attr.la_nlink == 0) ? CLF_UNLINK_LAST : 0;
1076                 if ((ma->ma_valid & MA_HSM) &&
1077                     (ma->ma_hsm.mh_flags & HS_EXISTS))
1078                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1079
1080                 rc = mdd_changelog_ns_store(env, mdd,
1081                          is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1082                          mdd_cobj, mdd_pobj, NULL, lname, handle);
1083         }
1084
1085 stop:
1086         mdd_trans_stop(env, mdd, rc, handle);
1087 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0)
1088         if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV &&
1089             ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY)
1090                 /* Since echo client is incapable of destorying ost object,
1091                  * it will destory the object here. */
1092                 rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la,
1093                                        ma->ma_lmm, ma->ma_lmm_size,
1094                                        ma->ma_cookie, 1);
1095 #else
1096 #warning "please remove this after 2.4 (LOD/OSP)."
1097 #endif
1098
1099 #ifdef HAVE_QUOTA_SUPPORT
1100         if (quota_opc)
1101                 /* Trigger dqrel on the owner of child and parent. If failed,
1102                  * the next call for lquota_chkquota will process it. */
1103                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
1104                               quota_opc);
1105 #endif
1106         return rc;
1107 }
1108
1109 /* has not lock on pobj yet */
1110 static int mdd_ni_sanity_check(const struct lu_env *env,
1111                                struct md_object *pobj,
1112                                const struct md_attr *ma)
1113 {
1114         struct mdd_object *obj = md2mdd_obj(pobj);
1115         int rc;
1116         ENTRY;
1117
1118         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
1119                 RETURN(0);
1120
1121         rc = mdd_may_create(env, obj, NULL, 1, S_ISDIR(ma->ma_attr.la_mode));
1122
1123         RETURN(rc);
1124 }
1125
1126 /*
1127  * Partial operation.
1128  */
1129 static int mdd_name_insert(const struct lu_env *env,
1130                            struct md_object *pobj,
1131                            const struct lu_name *lname,
1132                            const struct lu_fid *fid,
1133                            const struct md_attr *ma)
1134 {
1135         const char *name = lname->ln_name;
1136         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
1137         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1138         struct mdd_device *mdd = mdo2mdd(pobj);
1139         struct dynlock_handle *dlh;
1140         struct thandle *handle;
1141         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1142 #ifdef HAVE_QUOTA_SUPPORT
1143         struct md_ucred *uc = md_ucred(env);
1144         struct obd_device *obd = mdd->mdd_obd_dev;
1145         struct obd_export *exp = md_quota(env)->mq_exp;
1146         struct mds_obd *mds = &obd->u.mds;
1147         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1148         int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 };
1149         cfs_cap_t save = uc->mu_cap;
1150 #endif
1151         int rc;
1152         ENTRY;
1153
1154         /* XXX: this code won't be used ever:
1155          * DNE uses slightly different approach */
1156         LBUG();
1157
1158 #ifdef HAVE_QUOTA_SUPPORT
1159         if (mds->mds_quota) {
1160                 if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
1161                         struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1162
1163                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1164                         if (!rc) {
1165                                 void *data = NULL;
1166                                 mdd_data_get(env, mdd_obj, &data);
1167                                 quota_opc = FSFILT_OP_LINK;
1168                                 mdd_quota_wrapper(la_tmp, qids);
1169                                 /* get block quota for parent */
1170                                 lquota_chkquota(mds_quota_interface_ref, obd,
1171                                                 exp, qids, rec_pending, 1, NULL,
1172                                                 LQUOTA_FLAGS_BLK, data, 1);
1173                         }
1174                 } else {
1175                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
1176                 }
1177         }
1178 #endif
1179         handle = mdd_trans_create(env, mdo2mdd(pobj));
1180         if (IS_ERR(handle))
1181                 GOTO(out_pending, rc = PTR_ERR(handle));
1182
1183         rc = mdd_trans_start(env, mdo2mdd(pobj), handle);
1184
1185         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1186         if (dlh == NULL)
1187                 GOTO(out_trans, rc = -ENOMEM);
1188
1189         rc = mdd_ni_sanity_check(env, pobj, ma);
1190         if (rc)
1191                 GOTO(out_unlock, rc);
1192
1193         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
1194                                 handle, BYPASS_CAPA);
1195         if (rc)
1196                 GOTO(out_unlock, rc);
1197
1198         /*
1199          * For some case, no need update obj's ctime (LA_CTIME is not set),
1200          * e.g. split_dir.
1201          * For other cases, update obj's ctime (LA_CTIME is set),
1202          * e.g. cmr_link.
1203          */
1204         if (ma->ma_attr.la_valid & LA_CTIME) {
1205                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1206                 la->la_valid = LA_CTIME | LA_MTIME;
1207                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1208                                                         handle, 0);
1209         }
1210         EXIT;
1211 out_unlock:
1212         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1213 out_trans:
1214         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
1215 out_pending:
1216 #ifdef HAVE_QUOTA_SUPPORT
1217         if (mds->mds_quota) {
1218                 if (quota_opc) {
1219                         lquota_pending_commit(mds_quota_interface_ref,
1220                                               obd, qids, rec_pending, 1);
1221                         /* Trigger dqacq for the parent owner. If failed,
1222                          * the next call for lquota_chkquota will process it*/
1223                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
1224                                       rc, quota_opc);
1225                 } else {
1226                         uc->mu_cap = save;
1227                 }
1228         }
1229 #endif
1230         return rc;
1231 }
1232
1233 /* has not lock on pobj yet */
1234 static int mdd_nr_sanity_check(const struct lu_env *env,
1235                                struct md_object *pobj,
1236                                const struct md_attr *ma)
1237 {
1238         struct mdd_object *obj = md2mdd_obj(pobj);
1239         int rc;
1240         ENTRY;
1241
1242         if (ma->ma_attr_flags & MDS_PERM_BYPASS)
1243                 RETURN(0);
1244
1245         rc = mdd_may_unlink(env, obj, ma);
1246
1247         RETURN(rc);
1248 }
1249
1250 /*
1251  * Partial operation.
1252  */
1253 static int mdd_name_remove(const struct lu_env *env,
1254                            struct md_object *pobj,
1255                            const struct lu_name *lname,
1256                            const struct md_attr *ma)
1257 {
1258         const char *name = lname->ln_name;
1259         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1260         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
1261         struct mdd_device *mdd = mdo2mdd(pobj);
1262         struct dynlock_handle *dlh;
1263         struct thandle *handle;
1264         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1265 #ifdef HAVE_QUOTA_SUPPORT
1266         struct obd_device *obd = mdd->mdd_obd_dev;
1267         struct mds_obd *mds = &obd->u.mds;
1268         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1269         int quota_opc = 0;
1270 #endif
1271         int rc;
1272         ENTRY;
1273
1274         /* XXX: this code won't be used ever:
1275          * DNE uses slightly different approach */
1276         LBUG();
1277
1278 #ifdef HAVE_QUOTA_SUPPORT
1279         if (mds->mds_quota) {
1280                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1281
1282                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1283                 if (!rc) {
1284                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
1285                         mdd_quota_wrapper(la_tmp, qids);
1286                 }
1287         }
1288 #endif
1289         handle = mdd_trans_create(env, mdd);
1290         if (IS_ERR(handle))
1291                 GOTO(out_pending, rc = PTR_ERR(handle));
1292
1293         rc = mdd_trans_start(env, mdd, handle);
1294
1295         dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
1296         if (dlh == NULL)
1297                 GOTO(out_trans, rc = -ENOMEM);
1298
1299         rc = mdd_nr_sanity_check(env, pobj, ma);
1300         if (rc)
1301                 GOTO(out_unlock, rc);
1302
1303         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
1304                                 handle, BYPASS_CAPA);
1305         if (rc)
1306                 GOTO(out_unlock, rc);
1307
1308         /*
1309          * For some case, no need update obj's ctime (LA_CTIME is not set),
1310          * e.g. split_dir.
1311          * For other cases, update obj's ctime (LA_CTIME is set),
1312          * e.g. cmr_unlink.
1313          */
1314         if (ma->ma_attr.la_valid & LA_CTIME) {
1315                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1316                 la->la_valid = LA_CTIME | LA_MTIME;
1317                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
1318                                                         handle, 0);
1319         }
1320         EXIT;
1321 out_unlock:
1322         mdd_pdo_write_unlock(env, mdd_obj, dlh);
1323 out_trans:
1324         mdd_trans_stop(env, mdd, rc, handle);
1325 out_pending:
1326 #ifdef HAVE_QUOTA_SUPPORT
1327         /* Trigger dqrel for the parent owner.
1328          * If failed, the next call for lquota_chkquota will process it. */
1329         if (quota_opc)
1330                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
1331                               quota_opc);
1332 #endif
1333         return rc;
1334 }
1335
1336 /*
1337  * tobj maybe NULL
1338  * has mdd_write_lock on tobj alreay, but not on tgt_pobj yet
1339  */
1340 static int mdd_rt_sanity_check(const struct lu_env *env,
1341                                struct mdd_object *tgt_pobj,
1342                                struct mdd_object *tobj,
1343                                struct md_attr *ma)
1344 {
1345         int rc;
1346         ENTRY;
1347
1348         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
1349                 RETURN(0);
1350
1351         /* XXX: for mdd_rename_tgt, "tobj == NULL" does not mean tobj not
1352          * exist. In fact, tobj must exist, otherwise the call trace will be:
1353          * mdt_reint_rename_tgt -> mdo_name_insert -> ... -> mdd_name_insert.
1354          * When get here, tobj must be NOT NULL, the other case has been
1355          * processed in cmr_rename_tgt before mdd_rename_tgt and enable
1356          * MDS_PERM_BYPASS.
1357          * So check may_delete, but not check nlink of tgt_pobj. */
1358
1359         rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1);
1360
1361         RETURN(rc);
1362 }
1363
1364 /* Partial rename op on slave MDD */
1365 static int mdd_rename_tgt(const struct lu_env *env,
1366                           struct md_object *pobj, struct md_object *tobj,
1367                           const struct lu_fid *lf, const struct lu_name *lname,
1368                           struct md_attr *ma)
1369 {
1370         const char *name = lname->ln_name;
1371         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1372         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
1373         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
1374         struct mdd_device *mdd = mdo2mdd(pobj);
1375         struct dynlock_handle *dlh;
1376         struct thandle *handle;
1377 #ifdef HAVE_QUOTA_SUPPORT
1378         struct obd_device *obd = mdd->mdd_obd_dev;
1379         struct obd_export *exp = md_quota(env)->mq_exp;
1380         struct mds_obd *mds = &obd->u.mds;
1381         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1382         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1383         int quota_copc = 0, quota_popc = 0;
1384         int rec_pending[MAXQUOTAS] = { 0, 0 };
1385 #endif
1386         int rc;
1387         ENTRY;
1388
1389         /* XXX: this code won't be used ever:
1390          * DNE uses slightly different approach */
1391         LBUG();
1392
1393 #ifdef HAVE_QUOTA_SUPPORT
1394         if (mds->mds_quota && !tobj) {
1395                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1396
1397                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
1398                 if (!rc) {
1399                         void *data = NULL;
1400                         mdd_data_get(env, mdd_tpobj, &data);
1401                         quota_popc = FSFILT_OP_LINK;
1402                         mdd_quota_wrapper(la_tmp, qpids);
1403                         /* get block quota for target parent */
1404                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1405                                         qpids, rec_pending, 1, NULL,
1406                                         LQUOTA_FLAGS_BLK, data, 1);
1407                 }
1408         }
1409 #endif
1410         handle = mdd_trans_create(env, mdd);
1411         if (IS_ERR(handle))
1412                 GOTO(out_pending, rc = PTR_ERR(handle));
1413
1414         rc = mdd_trans_start(env, mdd, handle);
1415
1416         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
1417         if (dlh == NULL)
1418                 GOTO(out_trans, rc = -ENOMEM);
1419         if (tobj)
1420                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
1421
1422         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
1423         if (rc)
1424                 GOTO(cleanup, rc);
1425
1426         /*
1427          * If rename_tgt is called then we should just re-insert name with
1428          * correct fid, no need to dec/inc parent nlink if obj is dir.
1429          */
1430         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
1431         if (rc)
1432                 GOTO(cleanup, rc);
1433
1434         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
1435                                      BYPASS_CAPA);
1436         if (rc)
1437                 GOTO(cleanup, rc);
1438
1439         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1440         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1441
1442         la->la_valid = LA_CTIME | LA_MTIME;
1443         rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
1444         if (rc)
1445                 GOTO(cleanup, rc);
1446
1447         /*
1448          * For tobj is remote case cmm layer has processed
1449          * and pass NULL tobj to here. So when tobj is NOT NULL,
1450          * it must be local one.
1451          */
1452         if (tobj && mdd_object_exists(mdd_tobj)) {
1453                 mdo_ref_del(env, mdd_tobj, handle);
1454
1455                 /* Remove dot reference. */
1456                 if (S_ISDIR(ma->ma_attr.la_mode))
1457                         mdo_ref_del(env, mdd_tobj, handle);
1458
1459                 la->la_valid = LA_CTIME;
1460                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1461                 if (rc)
1462                         GOTO(cleanup, rc);
1463
1464                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1465                 if (rc)
1466                         GOTO(cleanup, rc);
1467
1468 #ifdef HAVE_QUOTA_SUPPORT
1469                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1470                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
1471                         quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1472                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1473                 }
1474 #endif
1475         }
1476         EXIT;
1477 cleanup:
1478         if (tobj)
1479                 mdd_write_unlock(env, mdd_tobj);
1480         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
1481 out_trans:
1482         if (rc == 0)
1483                 /* Bare EXT record with no RENAME in front of it signifies
1484                    a partial slave op */
1485                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj,
1486                                             mdd_tpobj, NULL, lname, handle);
1487
1488         mdd_trans_stop(env, mdd, rc, handle);
1489 out_pending:
1490 #ifdef HAVE_QUOTA_SUPPORT
1491         if (mds->mds_quota) {
1492                 if (quota_popc)
1493                         lquota_pending_commit(mds_quota_interface_ref, obd,
1494                                               qpids, rec_pending, 1);
1495
1496                 if (quota_copc)
1497                         /* Trigger dqrel on the target owner of child.
1498                          * If failed, the next call for lquota_chkquota
1499                          * will process it. */
1500                         lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids,
1501                                       rc, quota_copc);
1502         }
1503 #endif
1504         return rc;
1505 }
1506
1507 /*
1508  * The permission has been checked when obj created, no need check again.
1509  */
1510 static int mdd_cd_sanity_check(const struct lu_env *env,
1511                                struct mdd_object *obj)
1512 {
1513         ENTRY;
1514
1515         /* EEXIST check */
1516         if (!obj || mdd_is_dead_obj(obj))
1517                 RETURN(-ENOENT);
1518
1519         RETURN(0);
1520
1521 }
1522
1523 static int mdd_declare_create_data(const struct lu_env *env,
1524                                    struct mdd_device *mdd,
1525                                    struct mdd_object *obj,
1526                                    int lmm_size,
1527                                    struct thandle *handle)
1528 {
1529         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1530         int            rc;
1531
1532         buf->lb_buf = NULL;
1533         buf->lb_len = lmm_size;
1534         rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1535                                    0, handle);
1536         if (rc)
1537                 return rc;
1538
1539         rc = mdd_declare_lov_objid_update(env, mdd, handle);
1540
1541         return rc;
1542 }
1543
1544 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1545                            struct md_object *cobj, const struct md_op_spec *spec,
1546                            struct md_attr *ma)
1547 {
1548         struct mdd_device *mdd = mdo2mdd(cobj);
1549         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1550         struct mdd_object *son = md2mdd_obj(cobj);
1551         struct lov_mds_md *lmm = NULL;
1552         int                lmm_size = 0;
1553         struct thandle    *handle;
1554         int                rc;
1555         ENTRY;
1556
1557         rc = mdd_cd_sanity_check(env, son);
1558         if (rc)
1559                 RETURN(rc);
1560
1561         if (!md_should_create(spec->sp_cr_flags))
1562                 RETURN(0);
1563         lmm_size = ma->ma_lmm_size;
1564
1565         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma);
1566         if (rc)
1567                 RETURN(rc);
1568
1569         handle = mdd_trans_create(env, mdd);
1570         if (IS_ERR(handle))
1571                 GOTO(out_free, rc = PTR_ERR(handle));
1572
1573         rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle);
1574         if (rc)
1575                 GOTO(stop, rc);
1576
1577         rc = mdd_trans_start(env, mdd, handle);
1578         if (rc)
1579                 GOTO(stop, rc);
1580
1581         /*
1582          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1583          * Should this be fixed?
1584          */
1585
1586         /* Replay creates has objects already */
1587 #if 0
1588         if (spec->no_create) {
1589                 CDEBUG(D_INFO, "we already have lov ea\n");
1590                 rc = mdd_lov_set_md(env, mdd_pobj, son,
1591                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
1592                                     spec->u.sp_ea.eadatalen, handle, 0);
1593         } else
1594 #endif
1595                 /* No need mdd_lsm_sanity_check here */
1596                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
1597                                     lmm_size, handle, 0);
1598
1599         if (rc == 0)
1600                rc = mdd_attr_get_internal_locked(env, son, ma);
1601
1602         /* update lov_objid data, must be before transaction stop! */
1603         if (rc == 0)
1604                 mdd_lov_objid_update(mdd, lmm);
1605
1606 stop:
1607         mdd_trans_stop(env, mdd, rc, handle);
1608 out_free:
1609         /* Finish mdd_lov_create() stuff. */
1610         /* if no_create == 0 (not replay), we free lmm allocated by
1611          * mdd_lov_create() */
1612         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
1613         RETURN(rc);
1614 }
1615
1616 /* Get fid from name and parent */
1617 static int
1618 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1619              const struct lu_name *lname, struct lu_fid* fid, int mask)
1620 {
1621         const char          *name = lname->ln_name;
1622         const struct dt_key *key = (const struct dt_key *)name;
1623         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1624         struct mdd_device   *m = mdo2mdd(pobj);
1625         struct dt_object    *dir = mdd_object_child(mdd_obj);
1626         int rc;
1627         ENTRY;
1628
1629         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1630                 RETURN(-ESTALE);
1631
1632         rc = mdd_object_exists(mdd_obj);
1633         if (unlikely(rc == 0))
1634                 RETURN(-ESTALE);
1635         else if (unlikely(rc < 0)) {
1636                 CERROR("Object "DFID" locates on remote server\n",
1637                         PFID(mdo2fid(mdd_obj)));
1638                 RETURN(-EINVAL);
1639         }
1640
1641         /* The common filename length check. */
1642         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1643                 RETURN(-ENAMETOOLONG);
1644
1645         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1646                                             MOR_TGT_PARENT);
1647         if (rc)
1648                 RETURN(rc);
1649
1650         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1651                    dt_try_as_dir(env, dir))) {
1652
1653                 rc = dir->do_index_ops->dio_lookup(env, dir,
1654                                                  (struct dt_rec *)fid, key,
1655                                                  mdd_object_capa(env, mdd_obj));
1656                 if (rc > 0)
1657                         rc = 0;
1658                 else if (rc == 0)
1659                         rc = -ENOENT;
1660         } else
1661                 rc = -ENOTDIR;
1662
1663         RETURN(rc);
1664 }
1665
1666 int mdd_declare_object_initialize(const struct lu_env *env,
1667                                   struct mdd_object *child,
1668                                   struct md_attr *ma,
1669                                   struct thandle *handle)
1670 {
1671         int rc;
1672
1673         rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle);
1674         if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) {
1675                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1676                                 dot, handle);
1677                 if (rc == 0)
1678                         rc = mdo_declare_ref_add(env, child, handle);
1679         }
1680         if (rc == 0)
1681                 mdd_declare_links_add(env, child, handle);
1682
1683         return rc;
1684 }
1685
1686 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1687                           const struct lu_name *lname, struct mdd_object *child,
1688                           struct md_attr *ma, struct thandle *handle,
1689                           const struct md_op_spec *spec)
1690 {
1691         int rc;
1692         ENTRY;
1693
1694         /*
1695          * Update attributes for child.
1696          *
1697          * FIXME:
1698          *  (1) the valid bits should be converted between Lustre and Linux;
1699          *  (2) maybe, the child attributes should be set in OSD when creation.
1700          */
1701
1702         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
1703         if (rc != 0)
1704                 RETURN(rc);
1705
1706         if (S_ISDIR(ma->ma_attr.la_mode)) {
1707                 /* Add "." and ".." for newly created dir */
1708                 mdo_ref_add(env, child, handle);
1709                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1710                                              dot, handle, BYPASS_CAPA);
1711                 if (rc == 0)
1712                         rc = __mdd_index_insert_only(env, child, pfid,
1713                                                      dotdot, handle,
1714                                                      BYPASS_CAPA);
1715                 if (rc != 0)
1716                         mdo_ref_del(env, child, handle);
1717         }
1718         if (rc == 0)
1719                 mdd_links_add(env, child, pfid, lname, handle, 1);
1720
1721         RETURN(rc);
1722 }
1723
1724 /* has not lock on pobj yet */
1725 static int mdd_create_sanity_check(const struct lu_env *env,
1726                                    struct md_object *pobj,
1727                                    const struct lu_name *lname,
1728                                    struct md_attr *ma,
1729                                    struct md_op_spec *spec)
1730 {
1731         struct mdd_thread_info *info = mdd_env_info(env);
1732         struct lu_attr    *la        = &info->mti_la;
1733         struct lu_fid     *fid       = &info->mti_fid;
1734         struct mdd_object *obj       = md2mdd_obj(pobj);
1735         struct mdd_device *m         = mdo2mdd(pobj);
1736         int lookup                   = spec->sp_cr_lookup;
1737         int rc;
1738         ENTRY;
1739
1740         /* EEXIST check */
1741         if (mdd_is_dead_obj(obj))
1742                 RETURN(-ENOENT);
1743
1744         /*
1745          * In some cases this lookup is not needed - we know before if name
1746          * exists or not because MDT performs lookup for it.
1747          * name length check is done in lookup.
1748          */
1749         if (lookup) {
1750                 /*
1751                  * Check if the name already exist, though it will be checked in
1752                  * _index_insert also, for avoiding rolling back if exists
1753                  * _index_insert.
1754                  */
1755                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1756                                          MAY_WRITE | MAY_EXEC);
1757                 if (rc != -ENOENT)
1758                         RETURN(rc ? : -EEXIST);
1759         } else {
1760                 /*
1761                  * Check WRITE permission for the parent.
1762                  * EXEC permission have been checked
1763                  * when lookup before create already.
1764                  */
1765                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
1766                                                     MOR_TGT_PARENT);
1767                 if (rc)
1768                         RETURN(rc);
1769         }
1770
1771         /* sgid check */
1772         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1773         if (rc != 0)
1774                 RETURN(rc);
1775
1776         if (la->la_mode & S_ISGID) {
1777                 ma->ma_attr.la_gid = la->la_gid;
1778                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1779                         ma->ma_attr.la_mode |= S_ISGID;
1780                         ma->ma_attr.la_valid |= LA_MODE;
1781                 }
1782         }
1783
1784         switch (ma->ma_attr.la_mode & S_IFMT) {
1785         case S_IFLNK: {
1786                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1787
1788                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1789                         RETURN(-ENAMETOOLONG);
1790                 else
1791                         RETURN(0);
1792         }
1793         case S_IFDIR:
1794         case S_IFREG:
1795         case S_IFCHR:
1796         case S_IFBLK:
1797         case S_IFIFO:
1798         case S_IFSOCK:
1799                 rc = 0;
1800                 break;
1801         default:
1802                 rc = -EINVAL;
1803                 break;
1804         }
1805         RETURN(rc);
1806 }
1807
1808 static int mdd_declare_create(const struct lu_env *env,
1809                               struct mdd_device *mdd,
1810                               struct mdd_object *p,
1811                               struct mdd_object *c,
1812                               const struct lu_name *name,
1813                               struct md_attr *ma,
1814                               int lmm_size,
1815                               struct thandle *handle,
1816                               const struct md_op_spec *spec)
1817 {
1818         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1819         int            rc = 0;
1820
1821         rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec);
1822         if (rc)
1823                 GOTO(out, rc);
1824
1825         /* if dir, then can inherit default ACl */
1826         buf->lb_buf = NULL;
1827         buf->lb_len = lmm_size;
1828         if (S_ISDIR(ma->ma_attr.la_mode)) {
1829                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT,
1830                                            0, handle);
1831                 if (rc == 0)
1832                         rc = mdo_declare_ref_add(env, p, handle);
1833         }
1834         if (rc)
1835                 GOTO(out, rc);
1836
1837         rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS,
1838                                    0, handle);
1839         if (rc)
1840                 GOTO(out, rc);
1841
1842         rc = mdd_declare_object_initialize(env, c, ma, handle);
1843         if (rc)
1844                 GOTO(out, rc);
1845
1846         rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1847                                       name->ln_name, handle);
1848         if (rc)
1849                 GOTO(out, rc);
1850
1851         rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1852                                    0, handle);
1853         if (rc)
1854                 GOTO(out, rc);
1855
1856         if (S_ISLNK(ma->ma_attr.la_mode)) {
1857                 rc = dt_declare_record_write(env, mdd_object_child(c),
1858                                              strlen(spec->u.sp_symname), 0,
1859                                              handle);
1860                 if (rc)
1861                         GOTO(out, rc);
1862         }
1863         rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle);
1864         if (rc)
1865                 return rc;
1866
1867         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1868         if (rc)
1869                 return rc;
1870
1871         rc = mdd_declare_lov_objid_update(env, mdd, handle);
1872
1873 out:
1874         return rc;
1875 }
1876
1877
1878 /*
1879  * Create object and insert it into namespace.
1880  */
1881 static int mdd_create(const struct lu_env *env,
1882                       struct md_object *pobj,
1883                       const struct lu_name *lname,
1884                       struct md_object *child,
1885                       struct md_op_spec *spec,
1886                       struct md_attr* ma)
1887 {
1888         struct mdd_thread_info *info = mdd_env_info(env);
1889         struct lu_attr         *la = &info->mti_la_for_fix;
1890         struct md_attr         *ma_acl = &info->mti_ma;
1891         struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
1892         struct mdd_object      *son = md2mdd_obj(child);
1893         struct mdd_device      *mdd = mdo2mdd(pobj);
1894         struct lu_attr         *attr = &ma->ma_attr;
1895         struct lov_mds_md      *lmm = NULL;
1896         struct thandle         *handle;
1897         struct dynlock_handle  *dlh;
1898         const char             *name = lname->ln_name;
1899         int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
1900         int got_def_acl = 0;
1901 #ifdef HAVE_QUOTA_SUPPORT
1902         struct obd_device *obd = mdd->mdd_obd_dev;
1903         struct obd_export *exp = md_quota(env)->mq_exp;
1904         struct mds_obd *mds = &obd->u.mds;
1905         unsigned int qcids[MAXQUOTAS] = { 0, 0 };
1906         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
1907         int quota_opc = 0, block_count = 0;
1908         int inode_pending[MAXQUOTAS] = { 0, 0 };
1909         int block_pending[MAXQUOTAS] = { 0, 0 };
1910         int parent_pending[MAXQUOTAS] = { 0, 0 };
1911 #endif
1912         ENTRY;
1913
1914         /*
1915          * Two operations have to be performed:
1916          *
1917          *  - an allocation of a new object (->do_create()), and
1918          *
1919          *  - an insertion into a parent index (->dio_insert()).
1920          *
1921          * Due to locking, operation order is not important, when both are
1922          * successful, *but* error handling cases are quite different:
1923          *
1924          *  - if insertion is done first, and following object creation fails,
1925          *  insertion has to be rolled back, but this operation might fail
1926          *  also leaving us with dangling index entry.
1927          *
1928          *  - if creation is done first, is has to be undone if insertion
1929          *  fails, leaving us with leaked space, which is neither good, nor
1930          *  fatal.
1931          *
1932          * It seems that creation-first is simplest solution, but it is
1933          * sub-optimal in the frequent
1934          *
1935          *         $ mkdir foo
1936          *         $ mkdir foo
1937          *
1938          * case, because second mkdir is bound to create object, only to
1939          * destroy it immediately.
1940          *
1941          * To avoid this follow local file systems that do double lookup:
1942          *
1943          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1944          *
1945          *     1. create            (mdd_object_create_internal())
1946          *
1947          *     2. insert            (__mdd_index_insert(), lookup again)
1948          */
1949
1950         /* Sanity checks before big job. */
1951         rc = mdd_create_sanity_check(env, pobj, lname, ma, spec);
1952         if (rc)
1953                 RETURN(rc);
1954
1955 #ifdef HAVE_QUOTA_SUPPORT
1956         if (mds->mds_quota) {
1957                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1958
1959                 rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
1960                 if (!rc) {
1961                         int same = 0;
1962
1963                         quota_opc = FSFILT_OP_CREATE;
1964                         mdd_quota_wrapper(&ma->ma_attr, qcids);
1965                         mdd_quota_wrapper(la_tmp, qpids);
1966                         /* get file quota for child */
1967                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1968                                         qcids, inode_pending, 1, NULL, 0, NULL,
1969                                         0);
1970                         switch (ma->ma_attr.la_mode & S_IFMT) {
1971                         case S_IFLNK:
1972                         case S_IFDIR:
1973                                 block_count = 2;
1974                                 break;
1975                         case S_IFREG:
1976                                 block_count = 1;
1977                                 break;
1978                         }
1979                         if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
1980                             qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
1981                                 block_count += 1;
1982                                 same = 1;
1983                         }
1984                         /* get block quota for child and parent */
1985                         if (block_count)
1986                                 lquota_chkquota(mds_quota_interface_ref, obd,
1987                                                 exp, qcids, block_pending,
1988                                                 block_count, NULL,
1989                                                 LQUOTA_FLAGS_BLK, NULL, 0);
1990                         if (!same)
1991                                 lquota_chkquota(mds_quota_interface_ref, obd,
1992                                                 exp, qpids, parent_pending, 1,
1993                                                 NULL, LQUOTA_FLAGS_BLK, NULL,
1994                                                 0);
1995                 }
1996         }
1997 #endif
1998
1999         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2000                 GOTO(out_pending, rc = -EINPROGRESS);
2001
2002         /*
2003          * No RPC inside the transaction, so OST objects should be created at
2004          * first.
2005          */
2006         if (S_ISREG(attr->la_mode)) {
2007                 lmm_size = ma->ma_lmm_size;
2008                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
2009                                     spec, ma);
2010                 if (rc)
2011                         GOTO(out_pending, rc);
2012         }
2013
2014         if (!S_ISLNK(attr->la_mode)) {
2015                 ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
2016                 ma_acl->ma_acl = info->mti_xattr_buf;
2017                 ma_acl->ma_need = MA_ACL_DEF;
2018                 ma_acl->ma_valid = 0;
2019
2020                 mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
2021                 rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
2022                 mdd_read_unlock(env, mdd_pobj);
2023                 if (rc)
2024                         GOTO(out_free, rc);
2025                 else if (ma_acl->ma_valid & MA_ACL_DEF)
2026                         got_def_acl = 1;
2027         }
2028
2029         handle = mdd_trans_create(env, mdd);
2030         if (IS_ERR(handle))
2031                 GOTO(out_free, rc = PTR_ERR(handle));
2032
2033         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma,
2034                                 lmm_size, handle, spec);
2035         if (rc)
2036                 GOTO(out_stop, rc);
2037
2038         rc = mdd_trans_start(env, mdd, handle);
2039         if (rc)
2040                 GOTO(out_stop, rc);
2041
2042         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
2043         if (dlh == NULL)
2044                 GOTO(out_trans, rc = -ENOMEM);
2045
2046         mdd_write_lock(env, son, MOR_TGT_CHILD);
2047         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
2048         if (rc) {
2049                 mdd_write_unlock(env, son);
2050                 GOTO(cleanup, rc);
2051         }
2052
2053         created = 1;
2054
2055 #ifdef CONFIG_FS_POSIX_ACL
2056         if (got_def_acl) {
2057                 struct lu_buf *acl_buf = &info->mti_buf;
2058                 acl_buf->lb_buf = ma_acl->ma_acl;
2059                 acl_buf->lb_len = ma_acl->ma_acl_size;
2060
2061                 rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
2062                 if (rc) {
2063                         mdd_write_unlock(env, son);
2064                         GOTO(cleanup, rc);
2065                 } else {
2066                         ma->ma_attr.la_valid |= LA_MODE;
2067                 }
2068         }
2069 #endif
2070
2071         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
2072                                    son, ma, handle, spec);
2073         mdd_write_unlock(env, son);
2074         if (rc)
2075                 /*
2076                  * Object has no links, so it will be destroyed when last
2077                  * reference is released. (XXX not now.)
2078                  */
2079                 GOTO(cleanup, rc);
2080
2081         initialized = 1;
2082
2083         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2084                                 name, S_ISDIR(attr->la_mode), handle,
2085                                 mdd_object_capa(env, mdd_pobj));
2086
2087         if (rc)
2088                 GOTO(cleanup, rc);
2089
2090         inserted = 1;
2091
2092         /* No need mdd_lsm_sanity_check here */
2093         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
2094         if (rc) {
2095                 CERROR("error on stripe info copy %d \n", rc);
2096                 GOTO(cleanup, rc);
2097         }
2098         if (lmm && lmm_size > 0) {
2099                 /* Set Lov here, do not get lmm again later */
2100                 if (lmm_size > ma->ma_lmm_size) {
2101                         /* Reply buffer is smaller, need bigger one */
2102                         mdd_max_lmm_buffer(env, lmm_size);
2103                         if (unlikely(info->mti_max_lmm == NULL))
2104                                 GOTO(cleanup, rc = -ENOMEM);
2105                         ma->ma_lmm = info->mti_max_lmm;
2106                         ma->ma_big_lmm_used = 1;
2107                 }
2108                 memcpy(ma->ma_lmm, lmm, lmm_size);
2109                 ma->ma_lmm_size = lmm_size;
2110                 ma->ma_valid |= MA_LOV;
2111         }
2112
2113         if (S_ISLNK(attr->la_mode)) {
2114                 struct md_ucred  *uc = md_ucred(env);
2115                 struct dt_object *dt = mdd_object_child(son);
2116                 const char *target_name = spec->u.sp_symname;
2117                 int sym_len = strlen(target_name);
2118                 const struct lu_buf *buf;
2119                 loff_t pos = 0;
2120
2121                 buf = mdd_buf_get_const(env, target_name, sym_len);
2122                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2123                                                 mdd_object_capa(env, son),
2124                                                 uc->mu_cap &
2125                                                 CFS_CAP_SYS_RESOURCE_MASK);
2126
2127                 if (rc == sym_len)
2128                         rc = 0;
2129                 else
2130                         GOTO(cleanup, rc = -EFAULT);
2131         }
2132
2133         *la = ma->ma_attr;
2134         la->la_valid = LA_CTIME | LA_MTIME;
2135         rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
2136         if (rc)
2137                 GOTO(cleanup, rc);
2138
2139         /* Return attr back. */
2140         rc = mdd_attr_get_internal_locked(env, son, ma);
2141         EXIT;
2142 cleanup:
2143         if (rc && created) {
2144                 int rc2 = 0;
2145
2146                 if (inserted) {
2147                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
2148                                                  S_ISDIR(attr->la_mode),
2149                                                  handle, BYPASS_CAPA);
2150                         if (rc2)
2151                                 CERROR("error can not cleanup destroy %d\n",
2152                                        rc2);
2153                 }
2154
2155                 if (rc2 == 0) {
2156                         mdd_write_lock(env, son, MOR_TGT_CHILD);
2157                         mdo_ref_del(env, son, handle);
2158                         if (initialized && S_ISDIR(attr->la_mode))
2159                                 mdo_ref_del(env, son, handle);
2160                         mdd_write_unlock(env, son);
2161                 }
2162         }
2163
2164         /* update lov_objid data, must be before transaction stop! */
2165         if (rc == 0)
2166                 mdd_lov_objid_update(mdd, lmm);
2167
2168         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
2169 out_trans:
2170         if (rc == 0)
2171                 rc = mdd_changelog_ns_store(env, mdd,
2172                             S_ISDIR(attr->la_mode) ? CL_MKDIR :
2173                             S_ISREG(attr->la_mode) ? CL_CREATE :
2174                             S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2175                             0, son, mdd_pobj, NULL, lname, handle);
2176 out_stop:
2177         mdd_trans_stop(env, mdd, rc, handle);
2178 out_free:
2179         /* finish lov_create stuff, free all temporary data */
2180         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
2181 out_pending:
2182 #ifdef HAVE_QUOTA_SUPPORT
2183         if (quota_opc) {
2184                 lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
2185                                       inode_pending, 0);
2186                 lquota_pending_commit(mds_quota_interface_ref, obd, qcids,
2187                                       block_pending, 1);
2188                 lquota_pending_commit(mds_quota_interface_ref, obd, qpids,
2189                                       parent_pending, 1);
2190                 /* Trigger dqacq on the owner of child and parent. If failed,
2191                  * the next call for lquota_chkquota will process it. */
2192                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
2193                               quota_opc);
2194         }
2195 #endif
2196         /* The child object shouldn't be cached anymore */
2197         if (rc)
2198                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
2199                             &child->mo_lu.lo_header->loh_flags);
2200         return rc;
2201 }
2202
2203 /*
2204  * Get locks on parents in proper order
2205  * RETURN: < 0 - error, rename_order if successful
2206  */
2207 enum rename_order {
2208         MDD_RN_SAME,
2209         MDD_RN_SRCTGT,
2210         MDD_RN_TGTSRC
2211 };
2212
2213 static int mdd_rename_order(const struct lu_env *env,
2214                             struct mdd_device *mdd,
2215                             struct mdd_object *src_pobj,
2216                             struct mdd_object *tgt_pobj)
2217 {
2218         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2219         int rc;
2220         ENTRY;
2221
2222         if (src_pobj == tgt_pobj)
2223                 RETURN(MDD_RN_SAME);
2224
2225         /* compared the parent child relationship of src_p&tgt_p */
2226         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2227                 rc = MDD_RN_SRCTGT;
2228         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2229                 rc = MDD_RN_TGTSRC;
2230         } else {
2231                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2232                 if (rc == -EREMOTE)
2233                         rc = 0;
2234
2235                 if (rc == 1)
2236                         rc = MDD_RN_TGTSRC;
2237                 else if (rc == 0)
2238                         rc = MDD_RN_SRCTGT;
2239         }
2240
2241         RETURN(rc);
2242 }
2243
2244 /* has not mdd_write{read}_lock on any obj yet. */
2245 static int mdd_rename_sanity_check(const struct lu_env *env,
2246                                    struct mdd_object *src_pobj,
2247                                    struct mdd_object *tgt_pobj,
2248                                    struct mdd_object *sobj,
2249                                    struct mdd_object *tobj,
2250                                    struct md_attr *ma)
2251 {
2252         int rc = 0;
2253         ENTRY;
2254
2255         if (unlikely(ma->ma_attr_flags & MDS_PERM_BYPASS))
2256                 RETURN(0);
2257
2258         /* XXX: when get here, sobj must NOT be NULL,
2259          * the other case has been processed in cml_rename
2260          * before mdd_rename and enable MDS_PERM_BYPASS. */
2261         LASSERT(sobj);
2262
2263         rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0);
2264         if (rc)
2265                 RETURN(rc);
2266
2267         /* XXX: when get here, "tobj == NULL" means tobj must
2268          * NOT exist (neither on remote MDS, such case has been
2269          * processed in cml_rename before mdd_rename and enable
2270          * MDS_PERM_BYPASS).
2271          * So check may_create, but not check may_unlink. */
2272         if (!tobj)
2273                 rc = mdd_may_create(env, tgt_pobj, NULL,
2274                                     (src_pobj != tgt_pobj), 0);
2275         else
2276                 rc = mdd_may_delete(env, tgt_pobj, tobj, ma,
2277                                     (src_pobj != tgt_pobj), 1);
2278
2279         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2280             S_ISDIR(ma->ma_attr.la_mode))
2281                 rc = __mdd_may_link(env, tgt_pobj);
2282
2283         RETURN(rc);
2284 }
2285
2286 static int mdd_declare_rename(const struct lu_env *env,
2287                               struct mdd_device *mdd,
2288                               struct mdd_object *mdd_spobj,
2289                               struct mdd_object *mdd_tpobj,
2290                               struct mdd_object *mdd_sobj,
2291                               struct mdd_object *mdd_tobj,
2292                               const struct lu_name *sname,
2293                               const struct lu_name *tname,
2294                               struct md_attr *ma,
2295                               struct thandle *handle)
2296 {
2297         int rc;
2298
2299         LASSERT(mdd_spobj);
2300         LASSERT(mdd_tpobj);
2301         LASSERT(mdd_sobj);
2302
2303         /* name from source dir */
2304         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2305         if (rc)
2306                 return rc;
2307
2308         /* .. from source child */
2309         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2310                 /* source child can be directory,
2311                  * counted by source dir's nlink */
2312                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2313                 if (rc)
2314                         return rc;
2315
2316                 rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
2317                 if (rc)
2318                         return rc;
2319
2320                 rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
2321                                               dotdot, handle);
2322                 if (rc)
2323                         return rc;
2324
2325                 /* new target child can be directory,
2326                  * counted by target dir's nlink */
2327                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2328                 if (rc)
2329                         return rc;
2330
2331         }
2332
2333         rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
2334         if (rc)
2335                 return rc;
2336
2337         rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
2338         if (rc)
2339                 return rc;
2340         mdd_declare_links_add(env, mdd_sobj, handle);
2341         if (rc)
2342                 return rc;
2343
2344         rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
2345         if (rc)
2346                 return rc;
2347
2348         /* new name */
2349         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2350                         tname->ln_name, handle);
2351         if (rc)
2352                 return rc;
2353
2354         /* name from target dir (old name), we declare it unconditionally
2355          * as mdd_rename() calls delete unconditionally as well. so just
2356          * to balance declarations vs calls to change ... */
2357         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2358         if (rc)
2359                 return rc;
2360
2361         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2362                 /* delete target child in target parent directory */
2363                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2364                 if (rc)
2365                         return rc;
2366
2367                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2368                         /* target child can be directory,
2369                          * delete "." reference in target child directory */
2370                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2371                         if (rc)
2372                                 return rc;
2373
2374                         /* delete ".." reference in target parent directory */
2375                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2376                         if (rc)
2377                                 return rc;
2378                 }
2379
2380                 rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
2381                 if (rc)
2382                         return rc;
2383
2384                 mdd_declare_links_add(env, mdd_tobj, handle);
2385                 if (rc)
2386                         return rc;
2387
2388                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2389                 if (rc)
2390                         return rc;
2391         }
2392
2393         rc = mdd_declare_changelog_store(env, mdd, tname, handle);
2394         if (rc)
2395                 return rc;
2396
2397         rc = mdd_declare_changelog_store(env, mdd, sname, handle);
2398         if (rc)
2399                 return rc;
2400
2401         return rc;
2402 }
2403
2404 /* src object can be remote that is why we use only fid and type of object */
2405 static int mdd_rename(const struct lu_env *env,
2406                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2407                       const struct lu_fid *lf, const struct lu_name *lsname,
2408                       struct md_object *tobj, const struct lu_name *ltname,
2409                       struct md_attr *ma)
2410 {
2411         const char *sname = lsname->ln_name;
2412         const char *tname = ltname->ln_name;
2413         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2414         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2415         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2416         struct mdd_device *mdd = mdo2mdd(src_pobj);
2417         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2418         struct mdd_object *mdd_tobj = NULL;
2419         struct dynlock_handle *sdlh, *tdlh;
2420         struct thandle *handle;
2421         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2422         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2423         int is_dir;
2424         int rc, rc2;
2425
2426 #ifdef HAVE_QUOTA_SUPPORT
2427         struct obd_device *obd = mdd->mdd_obd_dev;
2428         struct obd_export *exp = md_quota(env)->mq_exp;
2429         struct mds_obd *mds = &obd->u.mds;
2430         unsigned int qspids[MAXQUOTAS] = { 0, 0 };
2431         unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
2432         unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
2433         int quota_copc = 0, quota_popc = 0;
2434         int rec_pending[MAXQUOTAS] = { 0, 0 };
2435 #endif
2436         ENTRY;
2437
2438         LASSERT(ma->ma_attr.la_mode & S_IFMT);
2439         is_dir = S_ISDIR(ma->ma_attr.la_mode);
2440
2441         if (tobj)
2442                 mdd_tobj = md2mdd_obj(tobj);
2443
2444 #ifdef HAVE_QUOTA_SUPPORT
2445         if (mds->mds_quota) {
2446                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
2447
2448                 rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
2449                 if (!rc) {
2450                         mdd_quota_wrapper(la_tmp, qspids);
2451                         if (!tobj) {
2452                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
2453                                                 BYPASS_CAPA);
2454                                 if (!rc) {
2455                                         void *data = NULL;
2456                                         mdd_data_get(env, mdd_tpobj, &data);
2457                                         quota_popc = FSFILT_OP_LINK;
2458                                         mdd_quota_wrapper(la_tmp, qtpids);
2459                                         /* get block quota for target parent */
2460                                         lquota_chkquota(mds_quota_interface_ref,
2461                                                         obd, exp, qtpids,
2462                                                         rec_pending, 1, NULL,
2463                                                         LQUOTA_FLAGS_BLK,
2464                                                         data, 1);
2465                                 }
2466                         }
2467                 }
2468         }
2469 #endif
2470         mdd_sobj = mdd_object_find(env, mdd, lf);
2471
2472         handle = mdd_trans_create(env, mdd);
2473         if (IS_ERR(handle))
2474                 GOTO(out_pending, rc = PTR_ERR(handle));
2475
2476         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2477                                 mdd_tobj, lsname, ltname, ma, handle);
2478         if (rc)
2479                 GOTO(stop, rc);
2480
2481         rc = mdd_trans_start(env, mdd, handle);
2482         if (rc)
2483                 GOTO(stop, rc);
2484
2485         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2486         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2487         if (rc < 0)
2488                 GOTO(cleanup_unlocked, rc);
2489
2490         /* Get locks in determined order */
2491         if (rc == MDD_RN_SAME) {
2492                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2493                                           sname, MOR_SRC_PARENT);
2494                 /* check hashes to determine do we need one lock or two */
2495                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2496                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2497                                 MOR_TGT_PARENT);
2498                 else
2499                         tdlh = sdlh;
2500         } else if (rc == MDD_RN_SRCTGT) {
2501                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2502                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2503         } else {
2504                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2505                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2506         }
2507         if (sdlh == NULL || tdlh == NULL)
2508                 GOTO(cleanup, rc = -ENOMEM);
2509
2510         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
2511                                      mdd_sobj, mdd_tobj, ma);
2512         if (rc)
2513                 GOTO(cleanup, rc);
2514
2515         /* Remove source name from source directory */
2516         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2517                                 mdd_object_capa(env, mdd_spobj));
2518         if (rc)
2519                 GOTO(cleanup, rc);
2520
2521         /* "mv dir1 dir2" needs "dir1/.." link update */
2522         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2523                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2524                                         mdd_object_capa(env, mdd_sobj));
2525                 if (rc)
2526                         GOTO(fixup_spobj2, rc);
2527
2528                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2529                                       handle, mdd_object_capa(env, mdd_sobj));
2530                 if (rc)
2531                         GOTO(fixup_spobj, rc);
2532         }
2533
2534         /* Remove target name from target directory
2535          * Here tobj can be remote one, so we do index_delete unconditionally
2536          * and -ENOENT is allowed.
2537          */
2538         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2539                                 mdd_object_capa(env, mdd_tpobj));
2540         if (rc != 0) {
2541                 if (mdd_tobj) {
2542                         /* tname might been renamed to something else */
2543                         GOTO(fixup_spobj, rc);
2544                 }
2545                 if (rc != -ENOENT)
2546                         GOTO(fixup_spobj, rc);
2547         }
2548
2549         /* Insert new fid with target name into target dir */
2550         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2551                                 mdd_object_capa(env, mdd_tpobj));
2552         if (rc)
2553                 GOTO(fixup_tpobj, rc);
2554
2555         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2556         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2557
2558         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2559         if (mdd_sobj) {
2560                 la->la_valid = LA_CTIME;
2561                 rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
2562                                                         handle, 0);
2563                 if (rc)
2564                         GOTO(fixup_tpobj, rc);
2565         }
2566
2567         /* Remove old target object
2568          * For tobj is remote case cmm layer has processed
2569          * and set tobj to NULL then. So when tobj is NOT NULL,
2570          * it must be local one.
2571          */
2572         if (tobj && mdd_object_exists(mdd_tobj)) {
2573                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2574                 if (mdd_is_dead_obj(mdd_tobj)) {
2575                         mdd_write_unlock(env, mdd_tobj);
2576                         /* shld not be dead, something is wrong */
2577                         CERROR("tobj is dead, something is wrong\n");
2578                         rc = -EINVAL;
2579                         goto cleanup;
2580                 }
2581                 mdo_ref_del(env, mdd_tobj, handle);
2582
2583                 /* Remove dot reference. */
2584                 if (is_dir)
2585                         mdo_ref_del(env, mdd_tobj, handle);
2586
2587                 la->la_valid = LA_CTIME;
2588                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2589                 if (rc)
2590                         GOTO(fixup_tpobj, rc);
2591
2592                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2593                 mdd_write_unlock(env, mdd_tobj);
2594                 if (rc)
2595                         GOTO(fixup_tpobj, rc);
2596
2597 #ifdef HAVE_QUOTA_SUPPORT
2598                 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2599                     ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
2600                         quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2601                         mdd_quota_wrapper(&ma->ma_attr, qtcids);
2602                 }
2603 #endif
2604         }
2605
2606         la->la_valid = LA_CTIME | LA_MTIME;
2607         rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
2608         if (rc)
2609                 GOTO(fixup_tpobj, rc);
2610
2611         if (mdd_spobj != mdd_tpobj) {
2612                 la->la_valid = LA_CTIME | LA_MTIME;
2613                 rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
2614                                                   handle, 0);
2615         }
2616
2617         if (rc == 0 && mdd_sobj) {
2618                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2619                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2620                                       mdo2fid(mdd_tpobj), ltname, handle);
2621                 if (rc == -ENOENT)
2622                         /* Old files might not have EA entry */
2623                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2624                                       lsname, handle, 0);
2625                 mdd_write_unlock(env, mdd_sobj);
2626                 /* We don't fail the transaction if the link ea can't be
2627                    updated -- fid2path will use alternate lookup method. */
2628                 rc = 0;
2629         }
2630
2631         EXIT;
2632
2633 fixup_tpobj:
2634         if (rc) {
2635                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2636                                          BYPASS_CAPA);
2637                 if (rc2)
2638                         CWARN("tp obj fix error %d\n",rc2);
2639
2640                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2641                     !mdd_is_dead_obj(mdd_tobj)) {
2642                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2643                                          mdo2fid(mdd_tobj), tname,
2644                                          is_dir, handle,
2645                                          BYPASS_CAPA);
2646
2647                         if (rc2)
2648                                 CWARN("tp obj fix error %d\n",rc2);
2649                 }
2650         }
2651
2652 fixup_spobj:
2653         if (rc && is_dir && mdd_sobj) {
2654                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2655                                               BYPASS_CAPA);
2656
2657                 if (rc2)
2658                         CWARN("sp obj dotdot delete error %d\n",rc2);
2659
2660
2661                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2662                                               dotdot, handle, BYPASS_CAPA);
2663                 if (rc2)
2664                         CWARN("sp obj dotdot insert error %d\n",rc2);
2665         }
2666
2667 fixup_spobj2:
2668         if (rc) {
2669                 rc2 = __mdd_index_insert(env, mdd_spobj,
2670                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2671                 if (rc2)
2672                         CWARN("sp obj fix error %d\n",rc2);
2673         }
2674 cleanup:
2675         if (likely(tdlh) && sdlh != tdlh)
2676                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2677         if (likely(sdlh))
2678                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2679 cleanup_unlocked:
2680         if (rc == 0)
2681                 rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, 0, mdd_tobj,
2682                                             mdd_spobj, lf, lsname, handle);
2683         if (rc == 0) {
2684                 struct lu_fid zero_fid;
2685                 fid_zero(&zero_fid);
2686                 /* If the rename target exist, The CL_EXT record should save
2687                  * the target fid as tfid, otherwise, use zero fid. LU-543 */
2688                 rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj,
2689                                             mdd_tpobj,
2690                                             mdd_tobj ? NULL : &zero_fid,
2691                                             ltname, handle);
2692         }
2693
2694 stop:
2695         mdd_trans_stop(env, mdd, rc, handle);
2696         if (mdd_sobj)
2697                 mdd_object_put(env, mdd_sobj);
2698 out_pending:
2699 #ifdef HAVE_QUOTA_SUPPORT
2700         if (mds->mds_quota) {
2701                 if (quota_popc)
2702                         lquota_pending_commit(mds_quota_interface_ref, obd,
2703                                               qtpids, rec_pending, 1);
2704
2705                 if (quota_copc) {
2706                         /* Trigger dqrel on the source owner of parent.
2707                          * If failed, the next call for lquota_chkquota will
2708                          * process it. */
2709                         lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
2710                                       FSFILT_OP_UNLINK_PARTIAL_PARENT);
2711
2712                         /* Trigger dqrel on the target owner of child.
2713                          * If failed, the next call for lquota_chkquota
2714                          * will process it. */
2715                         lquota_adjust(mds_quota_interface_ref, obd, qtcids,
2716                                       qtpids, rc, quota_copc);
2717                 }
2718         }
2719 #endif
2720         return rc;
2721 }
2722
2723 /** enable/disable storing of hardlink info */
2724 int mdd_linkea_enable = 1;
2725 CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644,
2726                 "record hardlink info in EAs");
2727
2728 /** Read the link EA into a temp buffer.
2729  * Uses the name_buf since it is generally large.
2730  * \retval IS_ERR err
2731  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2732  */
2733 struct lu_buf *mdd_links_get(const struct lu_env *env,
2734                              struct mdd_object *mdd_obj)
2735 {
2736         struct lu_buf *buf;
2737         struct lustre_capa *capa;
2738         struct link_ea_header *leh;
2739         int rc;
2740
2741         /* First try a small buf */
2742         buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2743         if (buf->lb_buf == NULL)
2744                 return ERR_PTR(-ENOMEM);
2745
2746         capa = mdd_object_capa(env, mdd_obj);
2747         rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2748         if (rc == -ERANGE) {
2749                 /* Buf was too small, figure out what we need. */
2750                 mdd_buf_put(buf);
2751                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2752                 if (rc < 0)
2753                         return ERR_PTR(rc);
2754                 buf = mdd_buf_alloc(env, rc);
2755                 if (buf->lb_buf == NULL)
2756                         return ERR_PTR(-ENOMEM);
2757                 rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
2758         }
2759         if (rc < 0)
2760                 return ERR_PTR(rc);
2761
2762         leh = buf->lb_buf;
2763         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2764                 leh->leh_magic = LINK_EA_MAGIC;
2765                 leh->leh_reccount = __swab32(leh->leh_reccount);
2766                 leh->leh_len = __swab64(leh->leh_len);
2767                 /* entries are swabbed by mdd_lee_unpack */
2768         }
2769         if (leh->leh_magic != LINK_EA_MAGIC)
2770                 return ERR_PTR(-EINVAL);
2771         if (leh->leh_reccount == 0)
2772                 return ERR_PTR(-ENODATA);
2773
2774         return buf;
2775 }
2776
2777 /** Pack a link_ea_entry.
2778  * All elements are stored as chars to avoid alignment issues.
2779  * Numbers are always big-endian
2780  * \retval record length
2781  */
2782 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2783                         const struct lu_fid *pfid)
2784 {
2785         struct lu_fid   tmpfid;
2786         int             reclen;
2787
2788         fid_cpu_to_be(&tmpfid, pfid);
2789         memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
2790         memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
2791         reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
2792
2793         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2794         lee->lee_reclen[1] = reclen & 0xff;
2795         return reclen;
2796 }
2797
2798 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2799                     struct lu_name *lname, struct lu_fid *pfid)
2800 {
2801         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2802         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
2803         fid_be_to_cpu(pfid, pfid);
2804         lname->ln_name = lee->lee_name;
2805         lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
2806 }
2807
2808 /** Add a record to the end of link ea buf */
2809 static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
2810                            const struct lu_fid *pfid,
2811                            const struct lu_name *lname)
2812 {
2813         struct link_ea_header *leh;
2814         struct link_ea_entry *lee;
2815         int reclen;
2816
2817         if (lname == NULL || pfid == NULL)
2818                 return -EINVAL;
2819
2820         /* Make sure our buf is big enough for the new one */
2821         leh = buf->lb_buf;
2822         reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2823         if (leh->leh_len + reclen > buf->lb_len) {
2824                 if (mdd_buf_grow(env, leh->leh_len + reclen) < 0)
2825                         return -ENOMEM;
2826         }
2827
2828         leh = buf->lb_buf;
2829         lee = buf->lb_buf + leh->leh_len;
2830         reclen = mdd_lee_pack(lee, lname, pfid);
2831         leh->leh_len += reclen;
2832         leh->leh_reccount++;
2833         return 0;
2834 }
2835
2836 static int mdd_declare_links_add(const struct lu_env *env,
2837                                  struct mdd_object *mdd_obj,
2838                                  struct thandle *handle)
2839 {
2840         int rc;
2841
2842         /* XXX: max size? */
2843         rc = mdo_declare_xattr_set(env, mdd_obj,
2844                              mdd_buf_get_const(env, NULL, 4096),
2845                              XATTR_NAME_LINK, 0, handle);
2846
2847         return rc;
2848 }
2849
2850 /* For pathologic linkers, we don't want to spend lots of time scanning the
2851  * link ea.  Limit ourseleves to something reasonable; links not in the EA
2852  * can be looked up via (slower) parent lookup.
2853  */
2854 #define LINKEA_MAX_COUNT 128
2855
2856 static int mdd_links_add(const struct lu_env *env,
2857                          struct mdd_object *mdd_obj,
2858                          const struct lu_fid *pfid,
2859                          const struct lu_name *lname,
2860                          struct thandle *handle, int first)
2861 {
2862         struct lu_buf *buf;
2863         struct link_ea_header *leh;
2864         int rc;
2865         ENTRY;
2866
2867         if (!mdd_linkea_enable)
2868                 RETURN(0);
2869
2870         buf = first ? ERR_PTR(-ENODATA) : mdd_links_get(env, mdd_obj);
2871         if (IS_ERR(buf)) {
2872                 rc = PTR_ERR(buf);
2873                 if (rc != -ENODATA) {
2874                         CERROR("link_ea read failed %d "DFID"\n", rc,
2875                                PFID(mdd_object_fid(mdd_obj)));
2876                         RETURN (rc);
2877                 }
2878                 /* empty EA; start one */
2879                 buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2880                 if (buf->lb_buf == NULL)
2881                         RETURN(-ENOMEM);
2882                 leh = buf->lb_buf;
2883                 leh->leh_magic = LINK_EA_MAGIC;
2884                 leh->leh_len = sizeof(struct link_ea_header);
2885                 leh->leh_reccount = 0;
2886         }
2887
2888         leh = buf->lb_buf;
2889         if (leh->leh_reccount > LINKEA_MAX_COUNT)
2890                 RETURN(-EOVERFLOW);
2891
2892         rc = __mdd_links_add(env, buf, pfid, lname);
2893         if (rc)
2894                 RETURN(rc);
2895
2896         leh = buf->lb_buf;
2897         rc = __mdd_xattr_set(env, mdd_obj,
2898                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2899                              XATTR_NAME_LINK, 0, handle);
2900         if (rc) {
2901                 if (rc == -ENOSPC)
2902                         CDEBUG(D_INODE, "link_ea add failed %d "DFID"\n", rc,
2903                                PFID(mdd_object_fid(mdd_obj)));
2904                 else
2905                         CERROR("link_ea add failed %d "DFID"\n", rc,
2906                                PFID(mdd_object_fid(mdd_obj)));
2907         }
2908
2909         if (buf->lb_len > OBD_ALLOC_BIG)
2910                 /* if we vmalloced a large buffer drop it */
2911                 mdd_buf_put(buf);
2912
2913         RETURN (rc);
2914 }
2915
2916 static int mdd_links_rename(const struct lu_env *env,
2917                             struct mdd_object *mdd_obj,
2918                             const struct lu_fid *oldpfid,
2919                             const struct lu_name *oldlname,
2920                             const struct lu_fid *newpfid,
2921                             const struct lu_name *newlname,
2922                             struct thandle *handle)
2923 {
2924         struct lu_buf  *buf;
2925         struct link_ea_header *leh;
2926         struct link_ea_entry  *lee;
2927         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
2928         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2929         int reclen = 0;
2930         int count;
2931         int rc, rc2 = 0;
2932         ENTRY;
2933
2934         if (!mdd_linkea_enable)
2935                 RETURN(0);
2936
2937         if (mdd_obj->mod_flags & DEAD_OBJ)
2938                 /* No more links, don't bother */
2939                 RETURN(0);
2940
2941         buf = mdd_links_get(env, mdd_obj);
2942         if (IS_ERR(buf)) {
2943                 rc = PTR_ERR(buf);
2944                 if (rc == -ENODATA)
2945                         CDEBUG(D_INODE, "link_ea read failed %d "DFID"\n",
2946                                rc, PFID(mdd_object_fid(mdd_obj)));
2947                 else
2948                         CERROR("link_ea read failed %d "DFID"\n",
2949                                rc, PFID(mdd_object_fid(mdd_obj)));
2950                 RETURN(rc);
2951         }
2952         leh = buf->lb_buf;
2953         lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
2954
2955         /* Find the old record */
2956         for(count = 0; count < leh->leh_reccount; count++) {
2957                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
2958                 if (tmpname->ln_namelen == oldlname->ln_namelen &&
2959                     lu_fid_eq(tmpfid, oldpfid) &&
2960                     (strncmp(tmpname->ln_name, oldlname->ln_name,
2961                              tmpname->ln_namelen) == 0))
2962                         break;
2963                 lee = (struct link_ea_entry *)((char *)lee + reclen);
2964         }
2965         if ((count + 1) > leh->leh_reccount) {
2966                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2967                        oldlname->ln_namelen, oldlname->ln_name);
2968                 GOTO(out, rc = -ENOENT);
2969         }
2970
2971         /* Remove the old record */
2972         leh->leh_reccount--;
2973         leh->leh_len -= reclen;
2974         memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len -
2975                 (char *)lee);
2976
2977         /* If renaming, add the new record */
2978         if (newpfid != NULL) {
2979                 /* if the add fails, we still delete the out-of-date old link */
2980                 rc2 = __mdd_links_add(env, buf, newpfid, newlname);
2981                 leh = buf->lb_buf;
2982         }
2983
2984         rc = __mdd_xattr_set(env, mdd_obj,
2985                              mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
2986                              XATTR_NAME_LINK, 0, handle);
2987
2988 out:
2989         if (rc == 0)
2990                 rc = rc2;
2991         if (rc)
2992                 CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n",
2993                        oldlname->ln_namelen, oldlname->ln_name, rc,
2994                        PFID(mdd_object_fid(mdd_obj)));
2995
2996         if (buf->lb_len > OBD_ALLOC_BIG)
2997                 /* if we vmalloced a large buffer drop it */
2998                 mdd_buf_put(buf);
2999
3000         RETURN (rc);
3001 }
3002
3003 const struct md_dir_operations mdd_dir_ops = {
3004         .mdo_is_subdir     = mdd_is_subdir,
3005         .mdo_lookup        = mdd_lookup,
3006         .mdo_create        = mdd_create,
3007         .mdo_rename        = mdd_rename,
3008         .mdo_link          = mdd_link,
3009         .mdo_unlink        = mdd_unlink,
3010         .mdo_lum_lmm_cmp   = mdd_lum_lmm_cmp,
3011         .mdo_name_insert   = mdd_name_insert,
3012         .mdo_name_remove   = mdd_name_remove,
3013         .mdo_rename_tgt    = mdd_rename_tgt,
3014         .mdo_create_data   = mdd_create_data,
3015 };