Whamcloud - gitweb
9f5b6f3785b0966fb3634d9d300d414eb7ef289f
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
61                         const struct lu_name *lname, struct lu_fid* fid,
62                         int mask);
63 static inline int mdd_links_add(const struct lu_env *env,
64                                 struct mdd_object *mdd_obj,
65                                 const struct lu_fid *pfid,
66                                 const struct lu_name *lname,
67                                 struct thandle *handle, int first);
68 static inline int mdd_links_del(const struct lu_env *env,
69                                 struct mdd_object *mdd_obj,
70                                 const struct lu_fid *pfid,
71                                 const struct lu_name *lname,
72                                 struct thandle *handle);
73
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         const char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 int mdd_lookup(const struct lu_env *env,
93                struct md_object *pobj, const struct lu_name *lname,
94                struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102 int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
103                    struct lu_fid *fid)
104 {
105         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
106 }
107
108 /*
109  * For root fid use special function, which does not compare version component
110  * of fid. Version component is different for root fids on all MDTs.
111  */
112 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
113 {
114         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
115                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
116 }
117
118 /*
119  * return 1: if lf is the fid of the ancestor of p1;
120  * return 0: if not;
121  *
122  * return -EREMOTE: if remote object is found, in this
123  * case fid of remote object is saved to @pf;
124  *
125  * otherwise: values < 0, errors.
126  */
127 static int mdd_is_parent(const struct lu_env *env,
128                          struct mdd_device *mdd,
129                          struct mdd_object *p1,
130                          const struct lu_fid *lf,
131                          struct lu_fid *pf)
132 {
133         struct mdd_object *parent = NULL;
134         struct lu_fid *pfid;
135         int rc;
136         ENTRY;
137
138         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
139         pfid = &mdd_env_info(env)->mti_fid;
140
141         /* Check for root first. */
142         if (mdd_is_root(mdd, mdo2fid(p1)))
143                 RETURN(0);
144
145         for(;;) {
146                 /* this is done recursively, bypass capa for each obj */
147                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
148                 rc = mdd_parent_fid(env, p1, pfid);
149                 if (rc)
150                         GOTO(out, rc);
151                 if (mdd_is_root(mdd, pfid))
152                         GOTO(out, rc = 0);
153                 if (lu_fid_eq(pfid, lf))
154                         GOTO(out, rc = 1);
155                 if (parent)
156                         mdd_object_put(env, parent);
157
158                 parent = mdd_object_find(env, mdd, pfid);
159                 if (IS_ERR(parent)) {
160                         GOTO(out, rc = PTR_ERR(parent));
161                 } else if (mdd_object_remote(parent)) {
162                         /*FIXME: Because of the restriction of rename in Phase I.
163                          * If the parent is remote, we just assumed lf is not the
164                          * parent of P1 for now */
165                         GOTO(out, rc = 0);
166                 }
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
188                   const struct lu_fid *fid, struct lu_fid *sfid)
189 {
190         struct mdd_device *mdd = mdo2mdd(mo);
191         int rc;
192         ENTRY;
193
194         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
195                 RETURN(0);
196
197         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
198         if (rc == 0) {
199                 /* found root */
200                 fid_zero(sfid);
201         } else if (rc == 1) {
202                 /* found @fid is parent */
203                 *sfid = *fid;
204                 rc = 0;
205         }
206         RETURN(rc);
207 }
208
209 /*
210  * Check that @dir contains no entries except (possibly) dot and dotdot.
211  *
212  * Returns:
213  *
214  *             0        empty
215  *      -ENOTDIR        not a directory object
216  *    -ENOTEMPTY        not empty
217  *           -ve        other error
218  *
219  */
220 static int mdd_dir_is_empty(const struct lu_env *env,
221                             struct mdd_object *dir)
222 {
223         struct dt_it     *it;
224         struct dt_object *obj;
225         const struct dt_it_ops *iops;
226         int result;
227         ENTRY;
228
229         obj = mdd_object_child(dir);
230         if (!dt_try_as_dir(env, obj))
231                 RETURN(-ENOTDIR);
232
233         iops = &obj->do_index_ops->dio_it;
234         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
235         if (!IS_ERR(it)) {
236                 result = iops->get(env, it, (const void *)"");
237                 if (result > 0) {
238                         int i;
239                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
240                                 result = iops->next(env, it);
241                         if (result == 0)
242                                 result = -ENOTEMPTY;
243                         else if (result == +1)
244                                 result = 0;
245                 } else if (result == 0)
246                         /*
247                          * Huh? Index contains no zero key?
248                          */
249                         result = -EIO;
250
251                 iops->put(env, it);
252                 iops->fini(env, it);
253         } else
254                 result = PTR_ERR(it);
255         RETURN(result);
256 }
257
258 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
259 {
260         struct mdd_device *m = mdd_obj2mdd_dev(obj);
261         struct lu_attr *la = &mdd_env_info(env)->mti_la;
262         int rc;
263         ENTRY;
264
265         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
266         if (rc)
267                 RETURN(rc);
268
269         /*
270          * Subdir count limitation can be broken through.
271          */
272         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
273             !S_ISDIR(la->la_mode))
274                 RETURN(-EMLINK);
275         else
276                 RETURN(0);
277 }
278
279 /*
280  * Check whether it may create the cobj under the pobj.
281  * cobj maybe NULL
282  */
283 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
284                    struct mdd_object *cobj, int check_perm, int check_nlink)
285 {
286         int rc = 0;
287         ENTRY;
288
289         if (cobj && mdd_object_exists(cobj))
290                 RETURN(-EEXIST);
291
292         if (mdd_is_dead_obj(pobj))
293                 RETURN(-ENOENT);
294
295         if (check_perm)
296                 rc = mdd_permission_internal_locked(env, pobj, NULL,
297                                                     MAY_WRITE | MAY_EXEC,
298                                                     MOR_TGT_PARENT);
299         if (!rc && check_nlink)
300                 rc = __mdd_may_link(env, pobj);
301
302         RETURN(rc);
303 }
304
305 /*
306  * Check whether can unlink from the pobj in the case of "cobj == NULL".
307  */
308 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
309                    const struct lu_attr *attr)
310 {
311         int rc;
312         ENTRY;
313
314         if (mdd_is_dead_obj(pobj))
315                 RETURN(-ENOENT);
316
317         if ((attr->la_valid & LA_FLAGS) &&
318             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
319                 RETURN(-EPERM);
320
321         rc = mdd_permission_internal_locked(env, pobj, NULL,
322                                             MAY_WRITE | MAY_EXEC,
323                                             MOR_TGT_PARENT);
324         if (rc)
325                 RETURN(rc);
326
327         if (mdd_is_append(pobj))
328                 RETURN(-EPERM);
329
330         RETURN(rc);
331 }
332
333 /*
334  * pobj == NULL is remote ops case, under such case, pobj's
335  * VTX feature has been checked already, no need check again.
336  */
337 static inline int mdd_is_sticky(const struct lu_env *env,
338                                 struct mdd_object *pobj,
339                                 struct mdd_object *cobj)
340 {
341         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
342         struct lu_ucred *uc = lu_ucred_assert(env);
343         int rc;
344
345         if (pobj) {
346                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
347                 if (rc)
348                         return rc;
349
350                 if (!(tmp_la->la_mode & S_ISVTX) ||
351                     (tmp_la->la_uid == uc->uc_fsuid))
352                         return 0;
353         }
354
355         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
356         if (rc)
357                 return rc;
358
359         if (tmp_la->la_uid == uc->uc_fsuid)
360                 return 0;
361
362         return !md_capable(uc, CFS_CAP_FOWNER);
363 }
364
365 static int mdd_may_delete_entry(const struct lu_env *env,
366                                 struct mdd_object *pobj, int check_perm)
367 {
368         ENTRY;
369
370         LASSERT(pobj != NULL);
371         if (!mdd_object_exists(pobj))
372                 RETURN(-ENOENT);
373
374         if (mdd_is_dead_obj(pobj))
375                 RETURN(-ENOENT);
376
377         if (check_perm) {
378                 int rc;
379                 rc = mdd_permission_internal_locked(env, pobj, NULL,
380                                             MAY_WRITE | MAY_EXEC,
381                                             MOR_TGT_PARENT);
382                 if (rc)
383                         RETURN(rc);
384         }
385
386         if (mdd_is_append(pobj))
387                 RETURN(-EPERM);
388
389         RETURN(0);
390 }
391
392 /*
393  * Check whether it may delete the cobj from the pobj.
394  * pobj maybe NULL
395  */
396 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
397                    struct mdd_object *cobj, struct lu_attr *cattr,
398                    struct lu_attr *src_attr, int check_perm, int check_empty)
399 {
400         int rc = 0;
401         ENTRY;
402
403         if (pobj) {
404                 rc = mdd_may_delete_entry(env, pobj, check_perm);
405                 if (rc != 0)
406                         RETURN(rc);
407         }
408
409         if (cobj == NULL)
410                 RETURN(0);
411
412         if (!mdd_object_exists(cobj))
413                 RETURN(-ENOENT);
414
415         if (mdd_is_dead_obj(cobj))
416                 RETURN(-ESTALE);
417
418
419         if (mdd_is_sticky(env, pobj, cobj))
420                 RETURN(-EPERM);
421
422         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
423                 RETURN(-EPERM);
424
425         if ((cattr->la_valid & LA_FLAGS) &&
426             (cattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
427                 RETURN(-EPERM);
428
429         /* additional check the rename case */
430         if (src_attr) {
431                 if (S_ISDIR(src_attr->la_mode)) {
432                         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
433
434                         if (!S_ISDIR(cattr->la_mode))
435                                 RETURN(-ENOTDIR);
436
437                         if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
438                                 RETURN(-EBUSY);
439                 } else if (S_ISDIR(cattr->la_mode))
440                         RETURN(-EISDIR);
441         }
442
443         if (S_ISDIR(cattr->la_mode) && check_empty)
444                 rc = mdd_dir_is_empty(env, cobj);
445
446         RETURN(rc);
447 }
448
449 /*
450  * tgt maybe NULL
451  * has mdd_write_lock on src already, but not on tgt yet
452  */
453 int mdd_link_sanity_check(const struct lu_env *env,
454                           struct mdd_object *tgt_obj,
455                           const struct lu_name *lname,
456                           struct mdd_object *src_obj)
457 {
458         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
459         int rc = 0;
460         ENTRY;
461
462         if (!mdd_object_exists(src_obj))
463                 RETURN(-ENOENT);
464
465         if (mdd_is_dead_obj(src_obj))
466                 RETURN(-ESTALE);
467
468         /* Local ops, no lookup before link, check filename length here. */
469         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
470                 RETURN(-ENAMETOOLONG);
471
472         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
473                 RETURN(-EPERM);
474
475         if (S_ISDIR(mdd_object_type(src_obj)))
476                 RETURN(-EPERM);
477
478         LASSERT(src_obj != tgt_obj);
479         if (tgt_obj) {
480                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
481                 if (rc)
482                         RETURN(rc);
483         }
484
485         rc = __mdd_may_link(env, src_obj);
486
487         RETURN(rc);
488 }
489
490 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
491                                    const char *name, struct thandle *handle,
492                                    struct lustre_capa *capa)
493 {
494         struct dt_object *next = mdd_object_child(pobj);
495         int               rc;
496         ENTRY;
497
498         if (dt_try_as_dir(env, next)) {
499                 rc = next->do_index_ops->dio_delete(env, next,
500                                                     (struct dt_key *)name,
501                                                     handle, capa);
502         } else
503                 rc = -ENOTDIR;
504
505         RETURN(rc);
506 }
507
508 static int __mdd_index_insert_only(const struct lu_env *env,
509                                    struct mdd_object *pobj,
510                                    const struct lu_fid *lf, const char *name,
511                                    struct thandle *handle,
512                                    struct lustre_capa *capa)
513 {
514         struct dt_object *next = mdd_object_child(pobj);
515         int               rc;
516         ENTRY;
517
518         if (dt_try_as_dir(env, next)) {
519                 struct lu_ucred  *uc = lu_ucred_check(env);
520                 int ignore_quota;
521
522                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
523                 rc = next->do_index_ops->dio_insert(env, next,
524                                                     (struct dt_rec*)lf,
525                                                     (const struct dt_key *)name,
526                                                     handle, capa, ignore_quota);
527         } else {
528                 rc = -ENOTDIR;
529         }
530         RETURN(rc);
531 }
532
533 /* insert named index, add reference if isdir */
534 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
535                               const struct lu_fid *lf, const char *name, int is_dir,
536                               struct thandle *handle, struct lustre_capa *capa)
537 {
538         int               rc;
539         ENTRY;
540
541         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
542         if (rc == 0 && is_dir) {
543                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
544                 mdo_ref_add(env, pobj, handle);
545                 mdd_write_unlock(env, pobj);
546         }
547         RETURN(rc);
548 }
549
550 /* delete named index, drop reference if isdir */
551 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
552                               const char *name, int is_dir, struct thandle *handle,
553                               struct lustre_capa *capa)
554 {
555         int               rc;
556         ENTRY;
557
558         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
559         if (rc == 0 && is_dir) {
560                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
561                 mdo_ref_del(env, pobj, handle);
562                 mdd_write_unlock(env, pobj);
563         }
564
565         RETURN(rc);
566 }
567
568 int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
569                             int reclen, struct thandle *handle)
570 {
571         int rc;
572
573         /* XXX: this is a temporary solution to declare llog changes
574          *      will be fixed in 2.3 with new llog implementation */
575
576         LASSERT(mdd->mdd_capa);
577
578         /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
579          *      have to set the parameter 'size' as INT_MAX or 0 to inform
580          *      OSD that this record write is for a llog write or catalog
581          *      header update, and osd declare function will reserve less
582          *      credits for optimization purpose.
583          *
584          *      Reserve 6 blocks for a llog write, since the llog file is
585          *      usually small, reserve 2 blocks for catalog header update,
586          *      because we know for sure that catalog header is already
587          *      allocated.
588          *
589          *      This hack should be removed in 2.3.
590          */
591
592         /* record itself */
593         rc = dt_declare_record_write(env, mdd->mdd_capa,
594                                      DECLARE_LLOG_WRITE, 0, handle);
595         if (rc)
596                 return rc;
597
598         /* header will be updated as well */
599         rc = dt_declare_record_write(env, mdd->mdd_capa,
600                                      DECLARE_LLOG_WRITE, 0, handle);
601         if (rc)
602                 return rc;
603
604         /* also we should be able to create new plain log */
605         rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
606         if (rc)
607                 return rc;
608
609         /* new record referencing new plain llog */
610         rc = dt_declare_record_write(env, mdd->mdd_capa,
611                                      DECLARE_LLOG_WRITE, 0, handle);
612         if (rc)
613                 return rc;
614
615         /* catalog's header will be updated as well */
616         rc = dt_declare_record_write(env, mdd->mdd_capa,
617                                      DECLARE_LLOG_REWRITE, 0, handle);
618
619         return rc;
620 }
621
622 int mdd_declare_changelog_store(const struct lu_env *env,
623                                 struct mdd_device *mdd,
624                                 const struct lu_name *fname,
625                                 struct thandle *handle)
626 {
627         struct obd_device               *obd = mdd2obd_dev(mdd);
628         struct llog_ctxt                *ctxt;
629         struct llog_changelog_rec       *rec;
630         struct lu_buf                   *buf;
631         int                              reclen;
632         int                              rc;
633
634         /* Not recording */
635         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
636                 return 0;
637
638         reclen = llog_data_len(sizeof(*rec) +
639                                (fname != NULL ? fname->ln_namelen : 0));
640         buf = mdd_buf_alloc(env, reclen);
641         if (buf->lb_buf == NULL)
642                 return -ENOMEM;
643
644         rec = buf->lb_buf;
645         rec->cr_hdr.lrh_len = reclen;
646         rec->cr_hdr.lrh_type = CHANGELOG_REC;
647
648         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
649         if (ctxt == NULL)
650                 return -ENXIO;
651
652         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
653         llog_ctxt_put(ctxt);
654
655         return rc;
656 }
657
658 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
659                                            struct mdd_device *mdd,
660                                            const struct lu_name *tname,
661                                            const struct lu_name *sname,
662                                            struct thandle *handle)
663 {
664         struct obd_device               *obd = mdd2obd_dev(mdd);
665         struct llog_ctxt                *ctxt;
666         struct llog_changelog_ext_rec   *rec;
667         struct lu_buf                   *buf;
668         int                              reclen;
669         int                              rc;
670
671         /* Not recording */
672         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
673                 return 0;
674
675         reclen = llog_data_len(sizeof(*rec) +
676                                (tname != NULL ? tname->ln_namelen : 0) +
677                                (sname != NULL ? 1 + sname->ln_namelen : 0));
678         buf = mdd_buf_alloc(env, reclen);
679         if (buf->lb_buf == NULL)
680                 return -ENOMEM;
681
682         rec = buf->lb_buf;
683         rec->cr_hdr.lrh_len = reclen;
684         rec->cr_hdr.lrh_type = CHANGELOG_REC;
685
686         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
687         if (ctxt == NULL)
688                 return -ENXIO;
689
690         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
691         llog_ctxt_put(ctxt);
692
693         return rc;
694 }
695
696 /** Add a changelog entry \a rec to the changelog llog
697  * \param mdd
698  * \param rec
699  * \param handle - currently ignored since llogs start their own transaction;
700  *                 this will hopefully be fixed in llog rewrite
701  * \retval 0 ok
702  */
703 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
704                         struct llog_changelog_rec *rec, struct thandle *th)
705 {
706         struct obd_device       *obd = mdd2obd_dev(mdd);
707         struct llog_ctxt        *ctxt;
708         int                      rc;
709
710         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
711         rec->cr_hdr.lrh_type = CHANGELOG_REC;
712         rec->cr.cr_time = cl_time();
713
714         spin_lock(&mdd->mdd_cl.mc_lock);
715         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
716          * but as long as the MDD transactions are ordered correctly for e.g.
717          * rename conflicts, I don't think this should matter. */
718         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
719         spin_unlock(&mdd->mdd_cl.mc_lock);
720
721         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
722         if (ctxt == NULL)
723                 return -ENXIO;
724
725         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
726         llog_ctxt_put(ctxt);
727         if (rc > 0)
728                 rc = 0;
729         return rc;
730 }
731
732 /** Add a changelog_ext entry \a rec to the changelog llog
733  * \param mdd
734  * \param rec
735  * \param handle - currently ignored since llogs start their own transaction;
736  *              this will hopefully be fixed in llog rewrite
737  * \retval 0 ok
738  */
739 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
740                             struct llog_changelog_ext_rec *rec,
741                             struct thandle *th)
742 {
743         struct obd_device       *obd = mdd2obd_dev(mdd);
744         struct llog_ctxt        *ctxt;
745         int                      rc;
746
747         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
748         /* llog_lvfs_write_rec sets the llog tail len */
749         rec->cr_hdr.lrh_type = CHANGELOG_REC;
750         rec->cr.cr_time = cl_time();
751
752         spin_lock(&mdd->mdd_cl.mc_lock);
753         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
754          * but as long as the MDD transactions are ordered correctly for e.g.
755          * rename conflicts, I don't think this should matter. */
756         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
757         spin_unlock(&mdd->mdd_cl.mc_lock);
758
759         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
760         if (ctxt == NULL)
761                 return -ENXIO;
762
763         /* nested journal transaction */
764         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
765         llog_ctxt_put(ctxt);
766         if (rc > 0)
767                 rc = 0;
768
769         return rc;
770 }
771
772 /** Store a namespace change changelog record
773  * If this fails, we must fail the whole transaction; we don't
774  * want the change to commit without the log entry.
775  * \param target - mdd_object of change
776  * \param parent - parent dir/object
777  * \param tname - target name string
778  * \param handle - transacion handle
779  */
780 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
781                            enum changelog_rec_type type, unsigned flags,
782                            struct mdd_object *target, struct mdd_object *parent,
783                            const struct lu_name *tname, struct thandle *handle)
784 {
785         struct llog_changelog_rec *rec;
786         struct lu_buf *buf;
787         int reclen;
788         int rc;
789         ENTRY;
790
791         /* Not recording */
792         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
793                 RETURN(0);
794         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
795                 RETURN(0);
796
797         LASSERT(target != NULL);
798         LASSERT(parent != NULL);
799         LASSERT(tname != NULL);
800         LASSERT(handle != NULL);
801
802         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
803         buf = mdd_buf_alloc(env, reclen);
804         if (buf->lb_buf == NULL)
805                 RETURN(-ENOMEM);
806         rec = buf->lb_buf;
807
808         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
809         rec->cr.cr_type = (__u32)type;
810         rec->cr.cr_tfid = *mdo2fid(target);
811         rec->cr.cr_pfid = *mdo2fid(parent);
812         rec->cr.cr_namelen = tname->ln_namelen;
813         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
814
815         target->mod_cltime = cfs_time_current_64();
816
817         rc = mdd_changelog_store(env, mdd, rec, handle);
818         if (rc < 0) {
819                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
820                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
821                         PFID(&rec->cr.cr_pfid));
822                 RETURN(-EFAULT);
823         }
824
825         RETURN(0);
826 }
827
828
829 /** Store a namespace change changelog record
830  * If this fails, we must fail the whole transaction; we don't
831  * want the change to commit without the log entry.
832  * \param target - mdd_object of change
833  * \param tpfid - target parent dir/object fid
834  * \param sfid - source object fid
835  * \param spfid - source parent fid
836  * \param tname - target name string
837  * \param sname - source name string
838  * \param handle - transacion handle
839  */
840 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
841                                       struct mdd_device    *mdd,
842                                       enum changelog_rec_type type,
843                                       unsigned flags,
844                                       struct mdd_object    *target,
845                                       const struct lu_fid  *tpfid,
846                                       const struct lu_fid  *sfid,
847                                       const struct lu_fid  *spfid,
848                                       const struct lu_name *tname,
849                                       const struct lu_name *sname,
850                                       struct thandle *handle)
851 {
852         struct llog_changelog_ext_rec *rec;
853         struct lu_buf *buf;
854         int reclen;
855         int rc;
856         ENTRY;
857
858         /* Not recording */
859         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
860                 RETURN(0);
861         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
862                 RETURN(0);
863
864         LASSERT(sfid != NULL);
865         LASSERT(tpfid != NULL);
866         LASSERT(tname != NULL);
867         LASSERT(handle != NULL);
868
869         reclen = llog_data_len(sizeof(*rec) +
870                                sname != NULL ? 1 + sname->ln_namelen : 0);
871         buf = mdd_buf_alloc(env, reclen);
872         if (buf->lb_buf == NULL)
873                 RETURN(-ENOMEM);
874         rec = buf->lb_buf;
875
876         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
877         rec->cr.cr_type = (__u32)type;
878         rec->cr.cr_pfid = *tpfid;
879         rec->cr.cr_sfid = *sfid;
880         rec->cr.cr_spfid = *spfid;
881         rec->cr.cr_namelen = tname->ln_namelen;
882         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
883         if (sname) {
884                 rec->cr.cr_name[tname->ln_namelen] = '\0';
885                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
886                         sname->ln_namelen);
887                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
888         }
889
890         if (likely(target != NULL)) {
891                 rec->cr.cr_tfid = *mdo2fid(target);
892                 target->mod_cltime = cfs_time_current_64();
893         } else {
894                 fid_zero(&rec->cr.cr_tfid);
895         }
896
897         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
898         if (rc < 0) {
899                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
900                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
901                 return -EFAULT;
902         }
903
904         return 0;
905 }
906
907 static int mdd_declare_link(const struct lu_env *env,
908                             struct mdd_device *mdd,
909                             struct mdd_object *p,
910                             struct mdd_object *c,
911                             const struct lu_name *name,
912                             struct thandle *handle)
913 {
914         int rc;
915
916         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
917         if (rc)
918                 return rc;
919
920         rc = mdo_declare_ref_add(env, c, handle);
921         if (rc)
922                 return rc;
923
924         rc = mdo_declare_attr_set(env, p, NULL, handle);
925         if (rc)
926                 return rc;
927
928         rc = mdo_declare_attr_set(env, c, NULL, handle);
929         if (rc)
930                 return rc;
931
932         rc = mdd_declare_links_add(env, c, handle);
933         if (rc)
934                 return rc;
935
936         rc = mdd_declare_changelog_store(env, mdd, name, handle);
937
938         return rc;
939 }
940
941 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
942                     struct md_object *src_obj, const struct lu_name *lname,
943                     struct md_attr *ma)
944 {
945         const char *name = lname->ln_name;
946         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
947         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
948         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
949         struct mdd_device *mdd = mdo2mdd(src_obj);
950         struct dynlock_handle *dlh;
951         struct thandle *handle;
952         int rc;
953         ENTRY;
954
955         handle = mdd_trans_create(env, mdd);
956         if (IS_ERR(handle))
957                 GOTO(out_pending, rc = PTR_ERR(handle));
958
959         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
960         if (rc)
961                 GOTO(stop, rc);
962
963         rc = mdd_trans_start(env, mdd, handle);
964         if (rc)
965                 GOTO(stop, rc);
966
967         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
968         if (dlh == NULL)
969                 GOTO(out_trans, rc = -ENOMEM);
970         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
971
972         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
973         if (rc)
974                 GOTO(out_unlock, rc);
975
976         rc = mdo_ref_add(env, mdd_sobj, handle);
977         if (rc)
978                 GOTO(out_unlock, rc);
979
980
981         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
982                                      name, handle,
983                                      mdd_object_capa(env, mdd_tobj));
984         if (rc != 0) {
985                 mdo_ref_del(env, mdd_sobj, handle);
986                 GOTO(out_unlock, rc);
987         }
988
989         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
990         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
991
992         la->la_valid = LA_CTIME | LA_MTIME;
993         rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
994         if (rc)
995                 GOTO(out_unlock, rc);
996
997         la->la_valid = LA_CTIME;
998         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
999         if (rc == 0) {
1000                 mdd_links_add(env, mdd_sobj,
1001                               mdo2fid(mdd_tobj), lname, handle, 0);
1002         }
1003
1004         EXIT;
1005 out_unlock:
1006         mdd_write_unlock(env, mdd_sobj);
1007         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
1008 out_trans:
1009         if (rc == 0)
1010                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1011                                             mdd_tobj, lname, handle);
1012 stop:
1013         mdd_trans_stop(env, mdd, rc, handle);
1014 out_pending:
1015         return rc;
1016 }
1017
1018 int mdd_declare_finish_unlink(const struct lu_env *env,
1019                               struct mdd_object *obj,
1020                               struct md_attr *ma,
1021                               struct thandle *handle)
1022 {
1023         int     rc;
1024
1025         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1026         if (rc)
1027                 return rc;
1028
1029         return mdo_declare_destroy(env, obj, handle);
1030 }
1031
1032 /* caller should take a lock before calling */
1033 int mdd_finish_unlink(const struct lu_env *env,
1034                       struct mdd_object *obj, struct md_attr *ma,
1035                       struct thandle *th)
1036 {
1037         int rc = 0;
1038         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1039         ENTRY;
1040
1041         LASSERT(mdd_write_locked(env, obj) != 0);
1042
1043         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
1044                 obj->mod_flags |= DEAD_OBJ;
1045                 /* add new orphan and the object
1046                  * will be deleted during mdd_close() */
1047                 if (obj->mod_count) {
1048                         rc = __mdd_orphan_add(env, obj, th);
1049                         if (rc == 0)
1050                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1051                                         "orphan list, open count = %d\n",
1052                                         PFID(mdd_object_fid(obj)),
1053                                         obj->mod_count);
1054                         else
1055                                 CERROR("Object "DFID" fail to be an orphan, "
1056                                        "open count = %d, maybe cause failed "
1057                                        "open replay\n",
1058                                         PFID(mdd_object_fid(obj)),
1059                                         obj->mod_count);
1060                 } else {
1061                         rc = mdo_destroy(env, obj, th);
1062                 }
1063         }
1064
1065         RETURN(rc);
1066 }
1067
1068 /*
1069  * pobj maybe NULL
1070  * has mdd_write_lock on cobj already, but not on pobj yet
1071  */
1072 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1073                             struct mdd_object *cobj, struct lu_attr *cattr)
1074 {
1075         int rc;
1076         ENTRY;
1077
1078         rc = mdd_may_delete(env, pobj, cobj, cattr, NULL, 1, 1);
1079
1080         RETURN(rc);
1081 }
1082
1083 static inline int mdd_declare_links_del(const struct lu_env *env,
1084                                         struct mdd_object *c,
1085                                         struct thandle *handle)
1086 {
1087         int rc = 0;
1088
1089         /* For directory, the linkEA will be removed together with the object. */
1090         if (!S_ISDIR(mdd_object_type(c)))
1091                 rc = mdd_declare_links_add(env, c, handle);
1092
1093         return rc;
1094 }
1095
1096 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1097                               struct mdd_object *p, struct mdd_object *c,
1098                               const struct lu_name *name, struct md_attr *ma,
1099                               struct thandle *handle)
1100 {
1101         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1102         int rc;
1103
1104         rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1105         if (rc)
1106                 return rc;
1107
1108         rc = mdo_declare_ref_del(env, p, handle);
1109         if (rc)
1110                 return rc;
1111
1112         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1113         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1114         la->la_valid = LA_CTIME | LA_MTIME;
1115         rc = mdo_declare_attr_set(env, p, la, handle);
1116         if (rc)
1117                 return rc;
1118
1119         if (c != NULL) {
1120                 rc = mdo_declare_ref_del(env, c, handle);
1121                 if (rc)
1122                         return rc;
1123
1124                 rc = mdo_declare_ref_del(env, c, handle);
1125                 if (rc)
1126                         return rc;
1127
1128                 rc = mdo_declare_attr_set(env, c, NULL, handle);
1129                 if (rc)
1130                         return rc;
1131
1132                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1133                 if (rc)
1134                         return rc;
1135
1136                 rc = mdd_declare_links_del(env, c, handle);
1137                 if (rc != 0)
1138                         return rc;
1139
1140                 /* FIXME: need changelog for remove entry */
1141                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1142         }
1143
1144         return rc;
1145 }
1146
1147 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1148                       struct md_object *cobj, const struct lu_name *lname,
1149                       struct md_attr *ma)
1150 {
1151         const char *name = lname->ln_name;
1152         struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
1153         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1154         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1155         struct mdd_object *mdd_cobj = NULL;
1156         struct mdd_device *mdd = mdo2mdd(pobj);
1157         struct dynlock_handle *dlh;
1158         struct thandle    *handle;
1159         int rc, is_dir = 0;
1160         ENTRY;
1161
1162         /* cobj == NULL means only delete name entry */
1163         if (likely(cobj != NULL)) {
1164                 mdd_cobj = md2mdd_obj(cobj);
1165                 if (mdd_object_exists(mdd_cobj) == 0)
1166                         RETURN(-ENOENT);
1167                 /* currently it is assume, it could only delete
1168                  * name entry of remote directory */
1169                 is_dir = 1;
1170         }
1171
1172         handle = mdd_trans_create(env, mdd);
1173         if (IS_ERR(handle))
1174                 RETURN(PTR_ERR(handle));
1175
1176         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1177                                 lname, ma, handle);
1178         if (rc)
1179                 GOTO(stop, rc);
1180
1181         rc = mdd_trans_start(env, mdd, handle);
1182         if (rc)
1183                 GOTO(stop, rc);
1184
1185         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1186         if (dlh == NULL)
1187                 GOTO(stop, rc = -ENOMEM);
1188
1189         if (likely(mdd_cobj != NULL)) {
1190                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1191
1192                 /* fetch cattr */
1193                 rc = mdd_la_get(env, mdd_cobj, cattr,
1194                                 mdd_object_capa(env, mdd_cobj));
1195                 if (rc)
1196                         GOTO(cleanup, rc);
1197
1198                 is_dir = S_ISDIR(cattr->la_mode);
1199
1200         }
1201
1202         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
1203         if (rc)
1204                 GOTO(cleanup, rc);
1205
1206         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1207                                 mdd_object_capa(env, mdd_pobj));
1208         if (rc)
1209                 GOTO(cleanup, rc);
1210
1211         if (likely(mdd_cobj != NULL)) {
1212                 rc = mdo_ref_del(env, mdd_cobj, handle);
1213                 if (rc != 0) {
1214                         __mdd_index_insert_only(env, mdd_pobj,
1215                                                 mdo2fid(mdd_cobj),
1216                                                 name, handle,
1217                                                 mdd_object_capa(env, mdd_pobj));
1218                         GOTO(cleanup, rc);
1219                 }
1220
1221                 if (is_dir)
1222                         /* unlink dot */
1223                         mdo_ref_del(env, mdd_cobj, handle);
1224
1225                 /* fetch updated nlink */
1226                 rc = mdd_la_get(env, mdd_cobj, cattr,
1227                                 mdd_object_capa(env, mdd_cobj));
1228                 if (rc)
1229                         GOTO(cleanup, rc);
1230         }
1231
1232         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1233         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1234
1235         la->la_valid = LA_CTIME | LA_MTIME;
1236         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1237         if (rc)
1238                 GOTO(cleanup, rc);
1239
1240         /* Enough for only unlink the entry */
1241         if (unlikely(mdd_cobj == NULL)) {
1242                 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1243                 GOTO(stop, rc);
1244         }
1245
1246         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1247                 /* update ctime of an unlinked file only if it is still
1248                  * opened or a link still exists */
1249                 la->la_valid = LA_CTIME;
1250                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1251                 if (rc)
1252                         GOTO(cleanup, rc);
1253         }
1254
1255         /* XXX: this transfer to ma will be removed with LOD/OSP */
1256         ma->ma_attr = *cattr;
1257         ma->ma_valid |= MA_INODE;
1258         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1259
1260         /* fetch updated nlink */
1261         if (rc == 0)
1262                 rc = mdd_la_get(env, mdd_cobj, cattr,
1263                                 mdd_object_capa(env, mdd_cobj));
1264
1265         if (!is_dir)
1266                 /* old files may not have link ea; ignore errors */
1267                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1268
1269         /* if object is removed then we can't get its attrs, use last get */
1270         if (cattr->la_nlink == 0) {
1271                 ma->ma_attr = *cattr;
1272                 ma->ma_valid |= MA_INODE;
1273         }
1274         EXIT;
1275 cleanup:
1276         mdd_write_unlock(env, mdd_cobj);
1277         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1278         if (rc == 0) {
1279                 int cl_flags;
1280
1281                 cl_flags = (cattr->la_nlink == 0) ? CLF_UNLINK_LAST : 0;
1282                 if ((ma->ma_valid & MA_HSM) &&
1283                     (ma->ma_hsm.mh_flags & HS_EXISTS))
1284                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1285
1286                 rc = mdd_changelog_ns_store(env, mdd,
1287                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1288                         mdd_cobj, mdd_pobj, lname, handle);
1289         }
1290
1291 stop:
1292         mdd_trans_stop(env, mdd, rc, handle);
1293
1294         return rc;
1295 }
1296
1297 /*
1298  * The permission has been checked when obj created, no need check again.
1299  */
1300 static int mdd_cd_sanity_check(const struct lu_env *env,
1301                                struct mdd_object *obj)
1302 {
1303         ENTRY;
1304
1305         /* EEXIST check */
1306         if (!obj || mdd_is_dead_obj(obj))
1307                 RETURN(-ENOENT);
1308
1309         RETURN(0);
1310
1311 }
1312
1313 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1314                            struct md_object *cobj, const struct md_op_spec *spec,
1315                            struct md_attr *ma)
1316 {
1317         struct mdd_device *mdd = mdo2mdd(cobj);
1318         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1319         struct mdd_object *son = md2mdd_obj(cobj);
1320         struct thandle    *handle;
1321         const struct lu_buf *buf;
1322         struct lu_attr    *attr = &mdd_env_info(env)->mti_cattr;
1323         int                rc;
1324         ENTRY;
1325
1326         /* do not let users to create stripes via .lustre/
1327          * mdd_obf_setup() sets IMMUTE_OBJ on this directory */
1328         if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ)
1329                 RETURN(-ENOENT);
1330
1331         rc = mdd_cd_sanity_check(env, son);
1332         if (rc)
1333                 RETURN(rc);
1334
1335         if (!md_should_create(spec->sp_cr_flags))
1336                 RETURN(0);
1337
1338         /*
1339          * there are following use cases for this function:
1340          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1341          *    striping can be specified or not
1342          * 2) CMD?
1343          */
1344         rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
1345         if (rc)
1346                 RETURN(rc);
1347
1348         /* calling ->ah_make_hint() is used to transfer information from parent */
1349         mdd_object_make_hint(env, mdd_pobj, son, attr);
1350
1351         handle = mdd_trans_create(env, mdd);
1352         if (IS_ERR(handle))
1353                 GOTO(out_free, rc = PTR_ERR(handle));
1354
1355         /*
1356          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1357          * Should this be fixed?
1358          */
1359         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
1360                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1361                spec->sp_cr_flags, spec->no_create);
1362
1363         if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
1364                 /* replay case or lfs setstripe */
1365                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1366                                         spec->u.sp_ea.eadatalen);
1367         } else {
1368                 buf = &LU_BUF_NULL;
1369         }
1370
1371         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1372                                   XATTR_NAME_LOV, 0, handle);
1373         if (rc)
1374                 GOTO(stop, rc);
1375
1376         rc = mdd_trans_start(env, mdd, handle);
1377         if (rc)
1378                 GOTO(stop, rc);
1379
1380         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1381                           0, handle, mdd_object_capa(env, son));
1382 stop:
1383         mdd_trans_stop(env, mdd, rc, handle);
1384 out_free:
1385         RETURN(rc);
1386 }
1387
1388 /* Get fid from name and parent */
1389 static int
1390 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1391              const struct lu_name *lname, struct lu_fid* fid, int mask)
1392 {
1393         const char          *name = lname->ln_name;
1394         const struct dt_key *key = (const struct dt_key *)name;
1395         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1396         struct mdd_device   *m = mdo2mdd(pobj);
1397         struct dt_object    *dir = mdd_object_child(mdd_obj);
1398         int rc;
1399         ENTRY;
1400
1401         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1402                 RETURN(-ESTALE);
1403
1404         if (mdd_object_remote(mdd_obj)) {
1405                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
1406                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
1407         } else if (!mdd_object_exists(mdd_obj)) {
1408                 RETURN(-ESTALE);
1409         }
1410
1411         /* The common filename length check. */
1412         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1413                 RETURN(-ENAMETOOLONG);
1414
1415         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1416                                             MOR_TGT_PARENT);
1417         if (rc)
1418                 RETURN(rc);
1419
1420         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1421                    dt_try_as_dir(env, dir))) {
1422
1423                 rc = dir->do_index_ops->dio_lookup(env, dir,
1424                                                  (struct dt_rec *)fid, key,
1425                                                  mdd_object_capa(env, mdd_obj));
1426                 if (rc > 0)
1427                         rc = 0;
1428                 else if (rc == 0)
1429                         rc = -ENOENT;
1430         } else
1431                 rc = -ENOTDIR;
1432
1433         RETURN(rc);
1434 }
1435
1436 static int mdd_declare_object_initialize(const struct lu_env *env,
1437                                          struct mdd_object *parent,
1438                                          struct mdd_object *child,
1439                                          struct lu_attr *attr,
1440                                          struct thandle *handle)
1441 {
1442         int rc;
1443         ENTRY;
1444
1445         /*
1446          * inode mode has been set in creation time, and it's based on umask,
1447          * la_mode and acl, don't set here again! (which will go wrong
1448          * because below function doesn't consider umask).
1449          * I'd suggest set all object attributes in creation time, see above.
1450          */
1451         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1452         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1453         rc = mdo_declare_attr_set(env, child, attr, handle);
1454         attr->la_valid |= LA_MODE | LA_TYPE;
1455         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1456                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1457                                               dot, handle);
1458                 if (rc == 0)
1459                         rc = mdo_declare_ref_add(env, child, handle);
1460
1461                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1462                                               dotdot, handle);
1463         }
1464
1465         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1466                         fid_is_dot_lustre(mdo2fid(child)) ||
1467                         fid_is_root(mdo2fid(child))))
1468                 mdd_declare_links_add(env, child, handle);
1469
1470         RETURN(rc);
1471 }
1472
1473 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1474                           const struct lu_name *lname, struct mdd_object *child,
1475                           const struct lu_attr *attr, struct thandle *handle,
1476                           const struct md_op_spec *spec)
1477 {
1478         int rc;
1479         ENTRY;
1480
1481         /*
1482          * Update attributes for child.
1483          *
1484          * FIXME:
1485          *  (1) the valid bits should be converted between Lustre and Linux;
1486          *  (2) maybe, the child attributes should be set in OSD when creation.
1487          */
1488
1489         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1490         /* arguments are supposed to stay the same */
1491         if (S_ISDIR(attr->la_mode)) {
1492                 /* Add "." and ".." for newly created dir */
1493                 mdo_ref_add(env, child, handle);
1494                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1495                                              dot, handle, BYPASS_CAPA);
1496                 if (rc == 0)
1497                         rc = __mdd_index_insert_only(env, child, pfid,
1498                                                      dotdot, handle,
1499                                                      BYPASS_CAPA);
1500                 if (rc != 0)
1501                         mdo_ref_del(env, child, handle);
1502         }
1503
1504         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1505                         fid_is_dot_lustre(mdo2fid(child)) ||
1506                         fid_is_root(mdo2fid(child))))
1507                 mdd_links_add(env, child, pfid, lname, handle, 1);
1508
1509         RETURN(rc);
1510 }
1511
1512 /* has not lock on pobj yet */
1513 static int mdd_create_sanity_check(const struct lu_env *env,
1514                                    struct md_object *pobj,
1515                                    struct lu_attr *pattr,
1516                                    const struct lu_name *lname,
1517                                    struct lu_attr *cattr,
1518                                    struct md_op_spec *spec)
1519 {
1520         struct mdd_thread_info *info = mdd_env_info(env);
1521         struct lu_fid     *fid       = &info->mti_fid;
1522         struct mdd_object *obj       = md2mdd_obj(pobj);
1523         struct mdd_device *m         = mdo2mdd(pobj);
1524         int rc;
1525         ENTRY;
1526
1527         /* EEXIST check */
1528         if (mdd_is_dead_obj(obj))
1529                 RETURN(-ENOENT);
1530
1531         /*
1532          * In some cases this lookup is not needed - we know before if name
1533          * exists or not because MDT performs lookup for it.
1534          * name length check is done in lookup.
1535          */
1536         if (spec->sp_cr_lookup) {
1537                 /*
1538                  * Check if the name already exist, though it will be checked in
1539                  * _index_insert also, for avoiding rolling back if exists
1540                  * _index_insert.
1541                  */
1542                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1543                                          MAY_WRITE | MAY_EXEC);
1544                 if (rc != -ENOENT)
1545                         RETURN(rc ? : -EEXIST);
1546         } else {
1547                 /*
1548                  * Check WRITE permission for the parent.
1549                  * EXEC permission have been checked
1550                  * when lookup before create already.
1551                  */
1552                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1553                                                     MOR_TGT_PARENT);
1554                 if (rc)
1555                         RETURN(rc);
1556         }
1557
1558         /* sgid check */
1559         if (pattr->la_mode & S_ISGID) {
1560                 cattr->la_gid = pattr->la_gid;
1561                 if (S_ISDIR(cattr->la_mode)) {
1562                         cattr->la_mode |= S_ISGID;
1563                         cattr->la_valid |= LA_MODE;
1564                 }
1565         }
1566
1567         switch (cattr->la_mode & S_IFMT) {
1568         case S_IFLNK: {
1569                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1570
1571                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1572                         RETURN(-ENAMETOOLONG);
1573                 else
1574                         RETURN(0);
1575         }
1576         case S_IFDIR:
1577         case S_IFREG:
1578         case S_IFCHR:
1579         case S_IFBLK:
1580         case S_IFIFO:
1581         case S_IFSOCK:
1582                 rc = 0;
1583                 break;
1584         default:
1585                 rc = -EINVAL;
1586                 break;
1587         }
1588         RETURN(rc);
1589 }
1590
1591 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1592                               struct mdd_object *p, struct mdd_object *c,
1593                               const struct lu_name *name,
1594                               struct lu_attr *attr,
1595                               int got_def_acl,
1596                               struct thandle *handle,
1597                               const struct md_op_spec *spec)
1598 {
1599         int rc;
1600
1601         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1602         if (rc)
1603                 GOTO(out, rc);
1604
1605 #ifdef CONFIG_FS_POSIX_ACL
1606         if (got_def_acl > 0) {
1607                 struct lu_buf *acl_buf;
1608
1609                 acl_buf = mdd_buf_get(env, NULL, got_def_acl);
1610                 /* if dir, then can inherit default ACl */
1611                 if (S_ISDIR(attr->la_mode)) {
1612                         rc = mdo_declare_xattr_set(env, c, acl_buf,
1613                                                    XATTR_NAME_ACL_DEFAULT,
1614                                                    0, handle);
1615                         if (rc)
1616                                 GOTO(out, rc);
1617                 }
1618
1619                 rc = mdo_declare_attr_set(env, c, attr, handle);
1620                 if (rc)
1621                         GOTO(out, rc);
1622
1623                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1624                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1625                 if (rc)
1626                         GOTO(out, rc);
1627         }
1628 #endif
1629
1630         if (S_ISDIR(attr->la_mode)) {
1631                 rc = mdo_declare_ref_add(env, p, handle);
1632                 if (rc)
1633                         GOTO(out, rc);
1634         }
1635
1636         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
1637         if (rc)
1638                 GOTO(out, rc);
1639
1640         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1641                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1642         else
1643                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1644                                               name->ln_name, handle);
1645         if (rc)
1646                 GOTO(out, rc);
1647
1648         /* replay case, create LOV EA from client data */
1649         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1650                 const struct lu_buf *buf;
1651
1652                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1653                                         spec->u.sp_ea.eadatalen);
1654                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1655                                            0, handle);
1656                 if (rc)
1657                         GOTO(out, rc);
1658         }
1659
1660         if (S_ISLNK(attr->la_mode)) {
1661                 rc = dt_declare_record_write(env, mdd_object_child(c),
1662                                              strlen(spec->u.sp_symname), 0,
1663                                              handle);
1664                 if (rc)
1665                         GOTO(out, rc);
1666         }
1667
1668         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1669                 rc = mdo_declare_attr_set(env, p, attr, handle);
1670                 if (rc)
1671                         return rc;
1672         }
1673
1674         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1675         if (rc)
1676                 return rc;
1677
1678 out:
1679         return rc;
1680 }
1681
1682 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1683                         struct lu_attr *la, struct lu_buf *acl_buf,
1684                         int *got_def_acl, int *reset_acl)
1685 {
1686         int     rc;
1687         ENTRY;
1688
1689         if (S_ISLNK(la->la_mode))
1690                 RETURN(0);
1691
1692         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
1693         rc = mdo_xattr_get(env, pobj, acl_buf,
1694                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
1695         mdd_read_unlock(env, pobj);
1696         if (rc > 0) {
1697                 /* If there are default ACL, fix mode by default ACL */
1698                 *got_def_acl = rc;
1699                 acl_buf->lb_len = rc;
1700                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
1701                 if (rc < 0)
1702                         RETURN(rc);
1703                 *reset_acl = rc;
1704         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
1705                 /* If there are no default ACL, fix mode by mask */
1706                 struct lu_ucred *uc = lu_ucred(env);
1707                 la->la_mode &= ~uc->uc_umask;
1708                 rc = 0;
1709         }
1710
1711         RETURN(rc);
1712 }
1713
1714 /*
1715  * Create object and insert it into namespace.
1716  */
1717 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
1718                       const struct lu_name *lname, struct md_object *child,
1719                       struct md_op_spec *spec, struct md_attr* ma)
1720 {
1721         struct mdd_thread_info  *info = mdd_env_info(env);
1722         struct lu_attr          *la = &info->mti_la_for_fix;
1723         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
1724         struct mdd_object       *son = md2mdd_obj(child);
1725         struct mdd_device       *mdd = mdo2mdd(pobj);
1726         struct lu_attr          *attr = &ma->ma_attr;
1727         struct thandle          *handle;
1728         struct lu_attr          *pattr = &info->mti_pattr;
1729         struct lu_buf           acl_buf;
1730         struct dynlock_handle   *dlh;
1731         const char              *name = lname->ln_name;
1732         int                      rc, created = 0, initialized = 0, inserted = 0;
1733         int                      got_def_acl = 0;
1734         int                      reset_acl = 0;
1735         ENTRY;
1736
1737         /*
1738          * Two operations have to be performed:
1739          *
1740          *  - an allocation of a new object (->do_create()), and
1741          *
1742          *  - an insertion into a parent index (->dio_insert()).
1743          *
1744          * Due to locking, operation order is not important, when both are
1745          * successful, *but* error handling cases are quite different:
1746          *
1747          *  - if insertion is done first, and following object creation fails,
1748          *  insertion has to be rolled back, but this operation might fail
1749          *  also leaving us with dangling index entry.
1750          *
1751          *  - if creation is done first, is has to be undone if insertion
1752          *  fails, leaving us with leaked space, which is neither good, nor
1753          *  fatal.
1754          *
1755          * It seems that creation-first is simplest solution, but it is
1756          * sub-optimal in the frequent
1757          *
1758          *         $ mkdir foo
1759          *         $ mkdir foo
1760          *
1761          * case, because second mkdir is bound to create object, only to
1762          * destroy it immediately.
1763          *
1764          * To avoid this follow local file systems that do double lookup:
1765          *
1766          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1767          *
1768          *     1. create            (mdd_object_create_internal())
1769          *
1770          *     2. insert            (__mdd_index_insert(), lookup again)
1771          */
1772
1773         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
1774         if (rc != 0)
1775                 RETURN(rc);
1776
1777         /* Sanity checks before big job. */
1778         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
1779         if (rc)
1780                 RETURN(rc);
1781
1782         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
1783                 GOTO(out_free, rc = -EINPROGRESS);
1784
1785         acl_buf.lb_buf = info->mti_xattr_buf;
1786         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
1787         rc = mdd_acl_init(env, mdd_pobj, attr, &acl_buf, &got_def_acl,
1788                           &reset_acl);
1789         if (rc < 0)
1790                 GOTO(out_free, rc);
1791
1792         mdd_object_make_hint(env, mdd_pobj, son, attr);
1793
1794         handle = mdd_trans_create(env, mdd);
1795         if (IS_ERR(handle))
1796                 GOTO(out_free, rc = PTR_ERR(handle));
1797
1798         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
1799                                 got_def_acl, handle, spec);
1800         if (rc)
1801                 GOTO(out_stop, rc);
1802
1803         rc = mdd_trans_start(env, mdd, handle);
1804         if (rc)
1805                 GOTO(out_stop, rc);
1806
1807         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1808         if (dlh == NULL)
1809                 GOTO(out_trans, rc = -ENOMEM);
1810
1811         mdd_write_lock(env, son, MOR_TGT_CHILD);
1812         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
1813         if (rc) {
1814                 mdd_write_unlock(env, son);
1815                 GOTO(cleanup, rc);
1816         }
1817
1818         created = 1;
1819
1820 #ifdef CONFIG_FS_POSIX_ACL
1821         if (got_def_acl) {
1822                 /* set default acl */
1823                 if (S_ISDIR(attr->la_mode)) {
1824                         LASSERTF(acl_buf.lb_len  == got_def_acl,
1825                                  "invalid acl_buf: %p:%d got_def %d\n",
1826                                  acl_buf.lb_buf, (int)acl_buf.lb_len,
1827                                  got_def_acl);
1828                         rc = mdo_xattr_set(env, son, &acl_buf,
1829                                            XATTR_NAME_ACL_DEFAULT, 0,
1830                                            handle, BYPASS_CAPA);
1831                         if (rc) {
1832                                 mdd_write_unlock(env, son);
1833                                 GOTO(cleanup, rc);
1834                         }
1835                 }
1836
1837                 /* set its own acl */
1838                 if (reset_acl) {
1839                         LASSERTF(acl_buf.lb_buf != NULL && acl_buf.lb_len != 0,
1840                                  "invalid acl_buf %p:%d\n", acl_buf.lb_buf,
1841                                  (int)acl_buf.lb_len);
1842                         rc = mdo_xattr_set(env, son, &acl_buf,
1843                                            XATTR_NAME_ACL_ACCESS,
1844                                            0, handle, BYPASS_CAPA);
1845                         if (rc) {
1846                                 mdd_write_unlock(env, son);
1847                                 GOTO(cleanup, rc);
1848                         }
1849                 }
1850         }
1851 #endif
1852
1853         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
1854                                    son, attr, handle, spec);
1855
1856         /*
1857          * in case of replay we just set LOVEA provided by the client
1858          * XXX: I think it would be interesting to try "old" way where
1859          *      MDT calls this xattr_set(LOV) in a different transaction.
1860          *      probably this way we code can be made better.
1861          */
1862         if (rc == 0 && (spec->no_create ||
1863                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
1864                 const struct lu_buf *buf;
1865
1866                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1867                                 spec->u.sp_ea.eadatalen);
1868                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
1869                                 BYPASS_CAPA);
1870         }
1871
1872         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1873                 rc = __mdd_orphan_add(env, son, handle);
1874
1875         mdd_write_unlock(env, son);
1876
1877         if (rc != 0)
1878                 /*
1879                  * Object has no links, so it will be destroyed when last
1880                  * reference is released. (XXX not now.)
1881                  */
1882                 GOTO(cleanup, rc);
1883
1884         initialized = 1;
1885
1886         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
1887                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1888                                         name, S_ISDIR(attr->la_mode), handle,
1889                                         mdd_object_capa(env, mdd_pobj));
1890
1891         if (rc != 0)
1892                 GOTO(cleanup, rc);
1893
1894         inserted = 1;
1895
1896         if (S_ISLNK(attr->la_mode)) {
1897                 struct lu_ucred  *uc = lu_ucred_assert(env);
1898                 struct dt_object *dt = mdd_object_child(son);
1899                 const char *target_name = spec->u.sp_symname;
1900                 int sym_len = strlen(target_name);
1901                 const struct lu_buf *buf;
1902                 loff_t pos = 0;
1903
1904                 buf = mdd_buf_get_const(env, target_name, sym_len);
1905                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1906                                                 mdd_object_capa(env, son),
1907                                                 uc->uc_cap &
1908                                                 CFS_CAP_SYS_RESOURCE_MASK);
1909
1910                 if (rc == sym_len)
1911                         rc = 0;
1912                 else
1913                         GOTO(cleanup, rc = -EFAULT);
1914         }
1915
1916         /* volatile file creation does not update parent directory times */
1917         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1918                 GOTO(cleanup, rc = 0);
1919
1920         /* update parent directory mtime/ctime */
1921         *la = *attr;
1922         la->la_valid = LA_CTIME | LA_MTIME;
1923         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1924         if (rc)
1925                 GOTO(cleanup, rc);
1926
1927         EXIT;
1928 cleanup:
1929         if (rc != 0 && created != 0) {
1930                 int rc2;
1931
1932                 if (inserted != 0) {
1933                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1934                                 rc2 = __mdd_orphan_del(env, son, handle);
1935                         else
1936                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1937                                                          S_ISDIR(attr->la_mode),
1938                                                          handle, BYPASS_CAPA);
1939                         if (rc2 != 0)
1940                                 goto out_stop;
1941                 }
1942
1943                 mdd_write_lock(env, son, MOR_TGT_CHILD);
1944                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
1945                         /* Drop the reference, no need to delete "."/"..",
1946                          * because the object to be destroied directly. */
1947                         rc2 = mdo_ref_del(env, son, handle);
1948                         if (rc2 != 0) {
1949                                 mdd_write_unlock(env, son);
1950                                 goto out_stop;
1951                         }
1952                 }
1953
1954                 rc2 = mdo_ref_del(env, son, handle);
1955                 if (rc2 != 0) {
1956                         mdd_write_unlock(env, son);
1957                         goto out_stop;
1958                 }
1959
1960                 mdo_destroy(env, son, handle);
1961                 mdd_write_unlock(env, son);
1962         }
1963
1964         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1965 out_trans:
1966         if (rc == 0 && fid_is_client_mdt_visible(mdo2fid(son)))
1967                 rc = mdd_changelog_ns_store(env, mdd,
1968                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
1969                         S_ISREG(attr->la_mode) ? CL_CREATE :
1970                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
1971                         0, son, mdd_pobj, lname, handle);
1972 out_stop:
1973         mdd_trans_stop(env, mdd, rc, handle);
1974 out_free:
1975         /* The child object shouldn't be cached anymore */
1976         if (rc)
1977                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1978                             &child->mo_lu.lo_header->loh_flags);
1979         return rc;
1980 }
1981
1982 /*
1983  * Get locks on parents in proper order
1984  * RETURN: < 0 - error, rename_order if successful
1985  */
1986 enum rename_order {
1987         MDD_RN_SAME,
1988         MDD_RN_SRCTGT,
1989         MDD_RN_TGTSRC
1990 };
1991
1992 static int mdd_rename_order(const struct lu_env *env,
1993                             struct mdd_device *mdd,
1994                             struct mdd_object *src_pobj,
1995                             struct mdd_object *tgt_pobj)
1996 {
1997         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1998         int rc;
1999         ENTRY;
2000
2001         if (src_pobj == tgt_pobj)
2002                 RETURN(MDD_RN_SAME);
2003
2004         /* compared the parent child relationship of src_p&tgt_p */
2005         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2006                 rc = MDD_RN_SRCTGT;
2007         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2008                 rc = MDD_RN_TGTSRC;
2009         } else {
2010                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2011                 if (rc == -EREMOTE)
2012                         rc = 0;
2013
2014                 if (rc == 1)
2015                         rc = MDD_RN_TGTSRC;
2016                 else if (rc == 0)
2017                         rc = MDD_RN_SRCTGT;
2018         }
2019
2020         RETURN(rc);
2021 }
2022
2023 /* has not mdd_write{read}_lock on any obj yet. */
2024 static int mdd_rename_sanity_check(const struct lu_env *env,
2025                                    struct mdd_object *src_pobj,
2026                                    struct mdd_object *tgt_pobj,
2027                                    struct mdd_object *sobj,
2028                                    struct mdd_object *tobj,
2029                                    struct lu_attr *so_attr,
2030                                    struct lu_attr *tg_attr)
2031 {
2032         int rc = 0;
2033         ENTRY;
2034
2035         /* XXX: when get here, sobj must NOT be NULL,
2036          * the other case has been processed in cml_rename
2037          * before mdd_rename and enable MDS_PERM_BYPASS. */
2038         LASSERT(sobj);
2039
2040         rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
2041         if (rc)
2042                 RETURN(rc);
2043
2044         /* XXX: when get here, "tobj == NULL" means tobj must
2045          * NOT exist (neither on remote MDS, such case has been
2046          * processed in cml_rename before mdd_rename and enable
2047          * MDS_PERM_BYPASS).
2048          * So check may_create, but not check may_unlink. */
2049         if (!tobj)
2050                 rc = mdd_may_create(env, tgt_pobj, NULL,
2051                                     (src_pobj != tgt_pobj), 0);
2052         else
2053                 rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
2054                                     (src_pobj != tgt_pobj), 1);
2055
2056         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2057             S_ISDIR(so_attr->la_mode))
2058                 rc = __mdd_may_link(env, tgt_pobj);
2059
2060         RETURN(rc);
2061 }
2062
2063 static int mdd_declare_rename(const struct lu_env *env,
2064                               struct mdd_device *mdd,
2065                               struct mdd_object *mdd_spobj,
2066                               struct mdd_object *mdd_tpobj,
2067                               struct mdd_object *mdd_sobj,
2068                               struct mdd_object *mdd_tobj,
2069                               const struct lu_name *tname,
2070                               const struct lu_name *sname,
2071                               struct md_attr *ma,
2072                               struct thandle *handle)
2073 {
2074         int rc;
2075
2076         LASSERT(mdd_spobj);
2077         LASSERT(mdd_tpobj);
2078         LASSERT(mdd_sobj);
2079
2080         /* name from source dir */
2081         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2082         if (rc)
2083                 return rc;
2084
2085         /* .. from source child */
2086         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2087                 /* source child can be directory,
2088                  * counted by source dir's nlink */
2089                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2090                 if (rc)
2091                         return rc;
2092
2093                 rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
2094                 if (rc)
2095                         return rc;
2096
2097                 rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
2098                                               dotdot, handle);
2099                 if (rc)
2100                         return rc;
2101
2102                 /* new target child can be directory,
2103                  * counted by target dir's nlink */
2104                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2105                 if (rc)
2106                         return rc;
2107
2108         }
2109
2110         rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
2111         if (rc)
2112                 return rc;
2113
2114         rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
2115         if (rc)
2116                 return rc;
2117         mdd_declare_links_add(env, mdd_sobj, handle);
2118         if (rc)
2119                 return rc;
2120
2121         rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
2122         if (rc)
2123                 return rc;
2124
2125         /* new name */
2126         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2127                         tname->ln_name, handle);
2128         if (rc)
2129                 return rc;
2130
2131         /* name from target dir (old name), we declare it unconditionally
2132          * as mdd_rename() calls delete unconditionally as well. so just
2133          * to balance declarations vs calls to change ... */
2134         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2135         if (rc)
2136                 return rc;
2137
2138         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2139                 /* delete target child in target parent directory */
2140                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2141                 if (rc)
2142                         return rc;
2143
2144                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2145                         /* target child can be directory,
2146                          * delete "." reference in target child directory */
2147                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2148                         if (rc)
2149                                 return rc;
2150
2151                         /* delete ".." reference in target parent directory */
2152                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2153                         if (rc)
2154                                 return rc;
2155                 }
2156
2157                 rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
2158                 if (rc)
2159                         return rc;
2160
2161                 mdd_declare_links_add(env, mdd_tobj, handle);
2162                 if (rc)
2163                         return rc;
2164
2165                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2166                 if (rc)
2167                         return rc;
2168         }
2169
2170         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2171         if (rc)
2172                 return rc;
2173
2174         return rc;
2175 }
2176
2177 /* src object can be remote that is why we use only fid and type of object */
2178 static int mdd_rename(const struct lu_env *env,
2179                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2180                       const struct lu_fid *lf, const struct lu_name *lsname,
2181                       struct md_object *tobj, const struct lu_name *ltname,
2182                       struct md_attr *ma)
2183 {
2184         const char *sname = lsname->ln_name;
2185         const char *tname = ltname->ln_name;
2186         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2187         struct lu_attr    *so_attr = &mdd_env_info(env)->mti_cattr;
2188         struct lu_attr    *tg_attr = &mdd_env_info(env)->mti_pattr;
2189         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2190         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2191         struct mdd_device *mdd = mdo2mdd(src_pobj);
2192         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2193         struct mdd_object *mdd_tobj = NULL;
2194         struct dynlock_handle *sdlh, *tdlh;
2195         struct thandle *handle;
2196         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2197         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2198         bool is_dir;
2199         bool tobj_ref = 0;
2200         bool tobj_locked = 0;
2201         unsigned cl_flags = 0;
2202         int rc, rc2;
2203         ENTRY;
2204
2205         if (tobj)
2206                 mdd_tobj = md2mdd_obj(tobj);
2207
2208         mdd_sobj = mdd_object_find(env, mdd, lf);
2209
2210         handle = mdd_trans_create(env, mdd);
2211         if (IS_ERR(handle))
2212                 GOTO(out_pending, rc = PTR_ERR(handle));
2213
2214         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2215                                 mdd_tobj, lsname, ltname, ma, handle);
2216         if (rc)
2217                 GOTO(stop, rc);
2218
2219         rc = mdd_trans_start(env, mdd, handle);
2220         if (rc)
2221                 GOTO(stop, rc);
2222
2223         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2224         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2225         if (rc < 0)
2226                 GOTO(cleanup_unlocked, rc);
2227
2228         /* Get locks in determined order */
2229         if (rc == MDD_RN_SAME) {
2230                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2231                                           sname, MOR_SRC_PARENT);
2232                 /* check hashes to determine do we need one lock or two */
2233                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2234                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2235                                 MOR_TGT_PARENT);
2236                 else
2237                         tdlh = sdlh;
2238         } else if (rc == MDD_RN_SRCTGT) {
2239                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2240                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2241         } else {
2242                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2243                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2244         }
2245         if (sdlh == NULL || tdlh == NULL)
2246                 GOTO(cleanup, rc = -ENOMEM);
2247
2248         rc = mdd_la_get(env, mdd_sobj, so_attr,
2249                         mdd_object_capa(env, mdd_sobj));
2250         if (rc)
2251                 GOTO(cleanup, rc);
2252
2253         if (mdd_tobj) {
2254                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2255                                 mdd_object_capa(env, mdd_tobj));
2256                 if (rc)
2257                         GOTO(cleanup, rc);
2258         }
2259
2260         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj,
2261                                      mdd_tobj, so_attr, tg_attr);
2262         if (rc)
2263                 GOTO(cleanup, rc);
2264
2265         is_dir = S_ISDIR(so_attr->la_mode);
2266
2267         /* Remove source name from source directory */
2268         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2269                                 mdd_object_capa(env, mdd_spobj));
2270         if (rc)
2271                 GOTO(cleanup, rc);
2272
2273         /* "mv dir1 dir2" needs "dir1/.." link update */
2274         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2275                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2276                                         mdd_object_capa(env, mdd_sobj));
2277                 if (rc)
2278                         GOTO(fixup_spobj2, rc);
2279
2280                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2281                                       handle, mdd_object_capa(env, mdd_sobj));
2282                 if (rc)
2283                         GOTO(fixup_spobj, rc);
2284         }
2285
2286         /* Remove target name from target directory
2287          * Here tobj can be remote one, so we do index_delete unconditionally
2288          * and -ENOENT is allowed.
2289          */
2290         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2291                                 mdd_object_capa(env, mdd_tpobj));
2292         if (rc != 0) {
2293                 if (mdd_tobj) {
2294                         /* tname might been renamed to something else */
2295                         GOTO(fixup_spobj, rc);
2296                 }
2297                 if (rc != -ENOENT)
2298                         GOTO(fixup_spobj, rc);
2299         }
2300
2301         /* Insert new fid with target name into target dir */
2302         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2303                                 mdd_object_capa(env, mdd_tpobj));
2304         if (rc)
2305                 GOTO(fixup_tpobj, rc);
2306
2307         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2308         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2309
2310         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2311         if (mdd_sobj) {
2312                 la->la_valid = LA_CTIME;
2313                 rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
2314                 if (rc)
2315                         GOTO(fixup_tpobj, rc);
2316         }
2317
2318         /* Remove old target object
2319          * For tobj is remote case cmm layer has processed
2320          * and set tobj to NULL then. So when tobj is NOT NULL,
2321          * it must be local one.
2322          */
2323         if (tobj && mdd_object_exists(mdd_tobj)) {
2324                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2325                 tobj_locked = 1;
2326                 if (mdd_is_dead_obj(mdd_tobj)) {
2327                         /* shld not be dead, something is wrong */
2328                         CERROR("tobj is dead, something is wrong\n");
2329                         rc = -EINVAL;
2330                         goto cleanup;
2331                 }
2332                 mdo_ref_del(env, mdd_tobj, handle);
2333
2334                 /* Remove dot reference. */
2335                 if (S_ISDIR(tg_attr->la_mode))
2336                         mdo_ref_del(env, mdd_tobj, handle);
2337                 tobj_ref = 1;
2338
2339                 /* fetch updated nlink */
2340                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2341                                 mdd_object_capa(env, mdd_tobj));
2342                 if (rc != 0) {
2343                         CERROR("%s: Failed to get nlink for tobj "
2344                                 DFID": rc = %d\n",
2345                                 mdd2obd_dev(mdd)->obd_name,
2346                                 PFID(tpobj_fid), rc);
2347                         GOTO(fixup_tpobj, rc);
2348                 }
2349
2350                 la->la_valid = LA_CTIME;
2351                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2352                 if (rc != 0) {
2353                         CERROR("%s: Failed to set ctime for tobj "
2354                                 DFID": rc = %d\n",
2355                                 mdd2obd_dev(mdd)->obd_name,
2356                                 PFID(tpobj_fid), rc);
2357                         GOTO(fixup_tpobj, rc);
2358                 }
2359
2360                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2361                 ma->ma_attr = *tg_attr;
2362                 ma->ma_valid |= MA_INODE;
2363                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2364                 if (rc != 0) {
2365                         CERROR("%s: Failed to unlink tobj "
2366                                 DFID": rc = %d\n",
2367                                 mdd2obd_dev(mdd)->obd_name,
2368                                 PFID(tpobj_fid), rc);
2369                         GOTO(fixup_tpobj, rc);
2370                 }
2371
2372                 /* fetch updated nlink */
2373                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2374                                 mdd_object_capa(env, mdd_tobj));
2375                 if (rc != 0) {
2376                         CERROR("%s: Failed to get nlink for tobj "
2377                                 DFID": rc = %d\n",
2378                                 mdd2obd_dev(mdd)->obd_name,
2379                                 PFID(tpobj_fid), rc);
2380                         GOTO(fixup_tpobj, rc);
2381                 }
2382                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2383                 ma->ma_attr = *tg_attr;
2384                 ma->ma_valid |= MA_INODE;
2385
2386                 if (so_attr->la_nlink == 0)
2387                         cl_flags |= CLF_RENAME_LAST;
2388         }
2389
2390         la->la_valid = LA_CTIME | LA_MTIME;
2391         rc = mdd_attr_check_set_internal(env, mdd_spobj, la, handle, 0);
2392         if (rc)
2393                 GOTO(fixup_tpobj, rc);
2394
2395         if (mdd_spobj != mdd_tpobj) {
2396                 la->la_valid = LA_CTIME | LA_MTIME;
2397                 rc = mdd_attr_check_set_internal(env, mdd_tpobj, la,
2398                                                  handle, 0);
2399         }
2400
2401         if (rc == 0 && mdd_sobj) {
2402                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2403                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2404                                       mdo2fid(mdd_tpobj), ltname, handle, 0, 0);
2405                 if (rc == -ENOENT)
2406                         /* Old files might not have EA entry */
2407                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2408                                       lsname, handle, 0);
2409                 mdd_write_unlock(env, mdd_sobj);
2410                 /* We don't fail the transaction if the link ea can't be
2411                    updated -- fid2path will use alternate lookup method. */
2412                 rc = 0;
2413         }
2414
2415         EXIT;
2416
2417 fixup_tpobj:
2418         if (rc) {
2419                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2420                                          BYPASS_CAPA);
2421                 if (rc2)
2422                         CWARN("tp obj fix error %d\n",rc2);
2423
2424                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2425                     !mdd_is_dead_obj(mdd_tobj)) {
2426                         if (tobj_ref) {
2427                                 mdo_ref_add(env, mdd_tobj, handle);
2428                                 if (is_dir)
2429                                         mdo_ref_add(env, mdd_tobj, handle);
2430                         }
2431
2432                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2433                                          mdo2fid(mdd_tobj), tname,
2434                                          is_dir, handle,
2435                                          BYPASS_CAPA);
2436
2437                         if (rc2)
2438                                 CWARN("tp obj fix error %d\n",rc2);
2439                 }
2440         }
2441
2442 fixup_spobj:
2443         if (rc && is_dir && mdd_sobj) {
2444                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2445                                               BYPASS_CAPA);
2446
2447                 if (rc2)
2448                         CWARN("sp obj dotdot delete error %d\n",rc2);
2449
2450
2451                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2452                                               dotdot, handle, BYPASS_CAPA);
2453                 if (rc2)
2454                         CWARN("sp obj dotdot insert error %d\n",rc2);
2455         }
2456
2457 fixup_spobj2:
2458         if (rc) {
2459                 rc2 = __mdd_index_insert(env, mdd_spobj,
2460                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2461                 if (rc2)
2462                         CWARN("sp obj fix error %d\n",rc2);
2463         }
2464 cleanup:
2465         if (tobj_locked)
2466                 mdd_write_unlock(env, mdd_tobj);
2467         if (likely(tdlh) && sdlh != tdlh)
2468                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2469         if (likely(sdlh))
2470                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2471 cleanup_unlocked:
2472         if (rc == 0)
2473                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2474                                                 mdd_tobj, tpobj_fid, lf,
2475                                                 spobj_fid, ltname, lsname,
2476                                                 handle);
2477
2478 stop:
2479         mdd_trans_stop(env, mdd, rc, handle);
2480 out_pending:
2481         mdd_object_put(env, mdd_sobj);
2482         return rc;
2483 }
2484
2485 int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata)
2486 {
2487         ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2488         if (ldata->ml_buf->lb_buf == NULL)
2489                 return -ENOMEM;
2490         ldata->ml_leh = ldata->ml_buf->lb_buf;
2491         ldata->ml_leh->leh_magic = LINK_EA_MAGIC;
2492         ldata->ml_leh->leh_len = sizeof(struct link_ea_header);
2493         ldata->ml_leh->leh_reccount = 0;
2494         return 0;
2495 }
2496
2497 /** Read the link EA into a temp buffer.
2498  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
2499  * A pointer to the buffer is stored in \a ldata::ml_buf.
2500  *
2501  * \retval 0 or error
2502  */
2503 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
2504                    struct mdd_link_data *ldata)
2505 {
2506         struct lustre_capa *capa;
2507         struct link_ea_header *leh;
2508         int rc;
2509
2510         /* First try a small buf */
2511         LASSERT(env != NULL);
2512         ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2513         if (ldata->ml_buf->lb_buf == NULL)
2514                 return -ENOMEM;
2515
2516         if (!mdd_object_exists(mdd_obj))
2517                 return -ENODATA;
2518
2519         capa = mdd_object_capa(env, mdd_obj);
2520         rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
2521                            XATTR_NAME_LINK, capa);
2522         if (rc == -ERANGE) {
2523                 /* Buf was too small, figure out what we need. */
2524                 mdd_buf_put(ldata->ml_buf);
2525                 rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
2526                                    XATTR_NAME_LINK, capa);
2527                 if (rc < 0)
2528                         return rc;
2529                 ldata->ml_buf = mdd_buf_alloc(env, rc);
2530                 if (ldata->ml_buf->lb_buf == NULL)
2531                         return -ENOMEM;
2532                 rc = mdo_xattr_get(env, mdd_obj, &LU_BUF_NULL,
2533                                    XATTR_NAME_LINK, capa);
2534         }
2535         if (rc < 0)
2536                 return rc;
2537
2538         leh = ldata->ml_buf->lb_buf;
2539         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2540                 leh->leh_magic = LINK_EA_MAGIC;
2541                 leh->leh_reccount = __swab32(leh->leh_reccount);
2542                 leh->leh_len = __swab64(leh->leh_len);
2543                 /* entries are swabbed by mdd_lee_unpack */
2544         }
2545         if (leh->leh_magic != LINK_EA_MAGIC)
2546                 return -EINVAL;
2547         if (leh->leh_reccount == 0)
2548                 return -ENODATA;
2549
2550         ldata->ml_leh = leh;
2551         return 0;
2552 }
2553
2554 /** Read the link EA into a temp buffer.
2555  * Uses the name_buf since it is generally large.
2556  * \retval IS_ERR err
2557  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2558  */
2559 struct lu_buf *mdd_links_get(const struct lu_env *env,
2560                              struct mdd_object *mdd_obj)
2561 {
2562         struct mdd_link_data ldata = { 0 };
2563         int rc;
2564
2565         rc = mdd_links_read(env, mdd_obj, &ldata);
2566         return rc ? ERR_PTR(rc) : ldata.ml_buf;
2567 }
2568
2569 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
2570                     struct mdd_link_data *ldata, struct thandle *handle)
2571 {
2572         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf,
2573                                                      ldata->ml_leh->leh_len);
2574         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
2575                              mdd_object_capa(env, mdd_obj));
2576 }
2577
2578 /** Pack a link_ea_entry.
2579  * All elements are stored as chars to avoid alignment issues.
2580  * Numbers are always big-endian
2581  * \retval record length
2582  */
2583 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2584                         const struct lu_fid *pfid)
2585 {
2586         struct lu_fid   tmpfid;
2587         int             reclen;
2588
2589         fid_cpu_to_be(&tmpfid, pfid);
2590         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
2591                 tmpfid.f_ver = ~0;
2592         memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
2593         memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
2594         reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
2595
2596         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2597         lee->lee_reclen[1] = reclen & 0xff;
2598         return reclen;
2599 }
2600
2601 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2602                     struct lu_name *lname, struct lu_fid *pfid)
2603 {
2604         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2605         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
2606         fid_be_to_cpu(pfid, pfid);
2607         lname->ln_name = lee->lee_name;
2608         lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
2609 }
2610
2611 int mdd_declare_links_add(const struct lu_env *env,
2612                           struct mdd_object *mdd_obj,
2613                           struct thandle *handle)
2614 {
2615         int rc;
2616
2617         /* XXX: max size? */
2618         rc = mdo_declare_xattr_set(env, mdd_obj,
2619                              mdd_buf_get_const(env, NULL, 4096),
2620                              XATTR_NAME_LINK, 0, handle);
2621
2622         return rc;
2623 }
2624
2625 /** Add a record to the end of link ea buf */
2626 int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
2627                       const struct lu_name *lname, const struct lu_fid *pfid)
2628 {
2629         LASSERT(ldata->ml_leh != NULL);
2630
2631         if (lname == NULL || pfid == NULL)
2632                 return -EINVAL;
2633
2634         ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2635         if (ldata->ml_leh->leh_len + ldata->ml_reclen >
2636             ldata->ml_buf->lb_len) {
2637                 if (mdd_buf_grow(env, ldata->ml_leh->leh_len +
2638                                  ldata->ml_reclen) < 0)
2639                         return -ENOMEM;
2640         }
2641
2642         ldata->ml_leh = ldata->ml_buf->lb_buf;
2643         ldata->ml_lee = ldata->ml_buf->lb_buf + ldata->ml_leh->leh_len;
2644         ldata->ml_reclen = mdd_lee_pack(ldata->ml_lee, lname, pfid);
2645         ldata->ml_leh->leh_len += ldata->ml_reclen;
2646         ldata->ml_leh->leh_reccount++;
2647         CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n",
2648                lname->ln_namelen, lname->ln_name);
2649         return 0;
2650 }
2651
2652 /** Del the current record from the link ea buf */
2653 void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
2654                        const struct lu_name *lname)
2655 {
2656         LASSERT(ldata->ml_leh != NULL);
2657
2658         ldata->ml_leh->leh_reccount--;
2659         ldata->ml_leh->leh_len -= ldata->ml_reclen;
2660         memmove(ldata->ml_lee, (char *)ldata->ml_lee + ldata->ml_reclen,
2661                 (char *)ldata->ml_leh + ldata->ml_leh->leh_len -
2662                 (char *)ldata->ml_lee);
2663         CDEBUG(D_INODE, "Old link_ea name '%.*s' is removed\n",
2664                lname->ln_namelen, lname->ln_name);
2665
2666 }
2667
2668 /**
2669  * Check if such a link exists in linkEA.
2670  *
2671  * \param mdd_obj object being handled
2672  * \param pfid parent fid the link to be found for
2673  * \param lname name in the parent's directory entry pointing to this object
2674  * \param ldata link data the search to be done on
2675  *
2676  * \retval   0 success
2677  * \retval -ENOENT link does not exist
2678  * \retval -ve on error
2679  */
2680 int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
2681                    struct mdd_link_data *ldata, const struct lu_name *lname,
2682                    const struct lu_fid  *pfid)
2683 {
2684         struct lu_name *tmpname = &mdd_env_info(env)->mti_name2;
2685         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2686         int count;
2687
2688         LASSERT(ldata->ml_leh != NULL);
2689
2690         /* link #0 */
2691         ldata->ml_lee = (struct link_ea_entry *)(ldata->ml_leh + 1);
2692
2693         for (count = 0; count < ldata->ml_leh->leh_reccount; count++) {
2694                 mdd_lee_unpack(ldata->ml_lee, &ldata->ml_reclen,
2695                                tmpname, tmpfid);
2696                 if (tmpname->ln_namelen == lname->ln_namelen &&
2697                     lu_fid_eq(tmpfid, pfid) &&
2698                     (strncmp(tmpname->ln_name, lname->ln_name,
2699                              tmpname->ln_namelen) == 0))
2700                         break;
2701                 ldata->ml_lee = (struct link_ea_entry *)((char *)ldata->ml_lee +
2702                                                          ldata->ml_reclen);
2703         }
2704
2705         if (count == ldata->ml_leh->leh_reccount) {
2706                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2707                        lname->ln_namelen, lname->ln_name);
2708                 return -ENOENT;
2709         }
2710         return 0;
2711 }
2712
2713 static int __mdd_links_add(const struct lu_env *env,
2714                            struct mdd_object *mdd_obj,
2715                            struct mdd_link_data *ldata,
2716                            const struct lu_name *lname,
2717                            const struct lu_fid *pfid,
2718                            int first, int check)
2719 {
2720         int rc;
2721
2722         if (ldata->ml_leh == NULL) {
2723                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
2724                 if (rc) {
2725                         if (rc != -ENODATA)
2726                                 return rc;
2727                         rc = mdd_links_new(env, ldata);
2728                         if (rc)
2729                                 return rc;
2730                 }
2731         }
2732
2733         if (check) {
2734                 rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
2735                 if (rc && rc != -ENOENT)
2736                         return rc;
2737                 if (rc == 0)
2738                         return -EEXIST;
2739         }
2740
2741         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
2742                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
2743
2744                 *tfid = *pfid;
2745                 tfid->f_ver = ~0;
2746                 mdd_links_add_buf(env, ldata, lname, tfid);
2747         }
2748
2749         return mdd_links_add_buf(env, ldata, lname, pfid);
2750 }
2751
2752 static int __mdd_links_del(const struct lu_env *env,
2753                            struct mdd_object *mdd_obj,
2754                            struct mdd_link_data *ldata,
2755                            const struct lu_name *lname,
2756                            const struct lu_fid *pfid)
2757 {
2758         int rc;
2759
2760         if (ldata->ml_leh == NULL) {
2761                 rc = mdd_links_read(env, mdd_obj, ldata);
2762                 if (rc)
2763                         return rc;
2764         }
2765
2766         rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
2767         if (rc)
2768                 return rc;
2769
2770         mdd_links_del_buf(env, ldata, lname);
2771         return 0;
2772 }
2773
2774 int mdd_links_rename(const struct lu_env *env,
2775                      struct mdd_object *mdd_obj,
2776                      const struct lu_fid *oldpfid,
2777                      const struct lu_name *oldlname,
2778                      const struct lu_fid *newpfid,
2779                      const struct lu_name *newlname,
2780                      struct thandle *handle,
2781                      int first, int check)
2782 {
2783         struct mdd_link_data ldata = { 0 };
2784         int updated = 0;
2785         int rc2 = 0;
2786         int rc = 0;
2787         ENTRY;
2788
2789         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
2790                 return 0;
2791
2792         LASSERT(oldpfid != NULL || newpfid != NULL);
2793
2794         if (mdd_obj->mod_flags & DEAD_OBJ)
2795                 /* No more links, don't bother */
2796                 RETURN(0);
2797
2798         if (oldpfid != NULL) {
2799                 rc = __mdd_links_del(env, mdd_obj, &ldata,
2800                                      oldlname, oldpfid);
2801                 if (rc) {
2802                         if ((check == 0) ||
2803                             (rc != -ENODATA && rc != -ENOENT))
2804                                 GOTO(out, rc);
2805                         /* No changes done. */
2806                         rc = 0;
2807                 } else {
2808                         updated = 1;
2809                 }
2810         }
2811
2812         /* If renaming, add the new record */
2813         if (newpfid != NULL) {
2814                 /* even if the add fails, we still delete the out-of-date
2815                  * old link */
2816                 rc2 = __mdd_links_add(env, mdd_obj, &ldata,
2817                                       newlname, newpfid, first, check);
2818                 if (rc2 == -EEXIST)
2819                         rc2 = 0;
2820                 else if (rc2 == 0)
2821                         updated = 1;
2822         }
2823
2824         if (updated)
2825                 rc = mdd_links_write(env, mdd_obj, &ldata, handle);
2826         EXIT;
2827 out:
2828         if (rc == 0)
2829                 rc = rc2;
2830         if (rc) {
2831                 int error = 1;
2832                 if (rc == -EOVERFLOW || rc == - ENOENT)
2833                         error = 0;
2834                 if (oldpfid == NULL)
2835                         CDEBUG(error ? D_ERROR : D_OTHER,
2836                                "link_ea add '%.*s' failed %d "DFID"\n",
2837                                newlname->ln_namelen, newlname->ln_name,
2838                                rc, PFID(mdd_object_fid(mdd_obj)));
2839                 else if (newpfid == NULL)
2840                         CDEBUG(error ? D_ERROR : D_OTHER,
2841                                "link_ea del '%.*s' failed %d "DFID"\n",
2842                                oldlname->ln_namelen, oldlname->ln_name,
2843                                rc, PFID(mdd_object_fid(mdd_obj)));
2844                 else
2845                         CDEBUG(error ? D_ERROR : D_OTHER,
2846                                "link_ea rename '%.*s'->'%.*s' failed %d "
2847                                DFID"\n",
2848                                oldlname->ln_namelen, oldlname->ln_name,
2849                                newlname->ln_namelen, newlname->ln_name,
2850                                rc, PFID(mdd_object_fid(mdd_obj)));
2851         }
2852
2853         if (ldata.ml_buf && ldata.ml_buf->lb_len > OBD_ALLOC_BIG)
2854                 /* if we vmalloced a large buffer drop it */
2855                 mdd_buf_put(ldata.ml_buf);
2856
2857         return rc;
2858 }
2859
2860 static inline int mdd_links_add(const struct lu_env *env,
2861                                 struct mdd_object *mdd_obj,
2862                                 const struct lu_fid *pfid,
2863                                 const struct lu_name *lname,
2864                                 struct thandle *handle, int first)
2865 {
2866         return mdd_links_rename(env, mdd_obj, NULL, NULL,
2867                                 pfid, lname, handle, first, 0);
2868 }
2869
2870 static inline int mdd_links_del(const struct lu_env *env,
2871                                 struct mdd_object *mdd_obj,
2872                                 const struct lu_fid *pfid,
2873                                 const struct lu_name *lname,
2874                                 struct thandle *handle)
2875 {
2876         return mdd_links_rename(env, mdd_obj, pfid, lname,
2877                                 NULL, NULL, handle, 0, 0);
2878 }
2879
2880 const struct md_dir_operations mdd_dir_ops = {
2881         .mdo_is_subdir     = mdd_is_subdir,
2882         .mdo_lookup        = mdd_lookup,
2883         .mdo_create        = mdd_create,
2884         .mdo_rename        = mdd_rename,
2885         .mdo_link          = mdd_link,
2886         .mdo_unlink        = mdd_unlink,
2887         .mdo_create_data   = mdd_create_data,
2888 };