Whamcloud - gitweb
59051e18cd0005d4c8f2568365ee13b357717327
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
61                         const struct lu_name *lname, struct lu_fid* fid,
62                         int mask);
63 static inline int mdd_links_add(const struct lu_env *env,
64                                 struct mdd_object *mdd_obj,
65                                 const struct lu_fid *pfid,
66                                 const struct lu_name *lname,
67                                 struct thandle *handle, int first);
68 static inline int mdd_links_del(const struct lu_env *env,
69                                 struct mdd_object *mdd_obj,
70                                 const struct lu_fid *pfid,
71                                 const struct lu_name *lname,
72                                 struct thandle *handle);
73
74 static int
75 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
76                     const struct lu_name *lname, struct lu_fid* fid, int mask)
77 {
78         const char *name = lname->ln_name;
79         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
80         struct dynlock_handle *dlh;
81         int rc;
82
83         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
84         if (unlikely(dlh == NULL))
85                 return -ENOMEM;
86         rc = __mdd_lookup(env, pobj, lname, fid, mask);
87         mdd_pdo_read_unlock(env, mdd_obj, dlh);
88
89         return rc;
90 }
91
92 int mdd_lookup(const struct lu_env *env,
93                struct md_object *pobj, const struct lu_name *lname,
94                struct lu_fid* fid, struct md_op_spec *spec)
95 {
96         int rc;
97         ENTRY;
98         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
99         RETURN(rc);
100 }
101
102 int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
103                    struct lu_fid *fid)
104 {
105         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
106 }
107
108 /*
109  * For root fid use special function, which does not compare version component
110  * of fid. Version component is different for root fids on all MDTs.
111  */
112 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
113 {
114         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
115                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
116 }
117
118 /*
119  * return 1: if lf is the fid of the ancestor of p1;
120  * return 0: if not;
121  *
122  * return -EREMOTE: if remote object is found, in this
123  * case fid of remote object is saved to @pf;
124  *
125  * otherwise: values < 0, errors.
126  */
127 static int mdd_is_parent(const struct lu_env *env,
128                          struct mdd_device *mdd,
129                          struct mdd_object *p1,
130                          const struct lu_fid *lf,
131                          struct lu_fid *pf)
132 {
133         struct mdd_object *parent = NULL;
134         struct lu_fid *pfid;
135         int rc;
136         ENTRY;
137
138         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
139         pfid = &mdd_env_info(env)->mti_fid;
140
141         /* Check for root first. */
142         if (mdd_is_root(mdd, mdo2fid(p1)))
143                 RETURN(0);
144
145         for(;;) {
146                 /* this is done recursively, bypass capa for each obj */
147                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
148                 rc = mdd_parent_fid(env, p1, pfid);
149                 if (rc)
150                         GOTO(out, rc);
151                 if (mdd_is_root(mdd, pfid))
152                         GOTO(out, rc = 0);
153                 if (lu_fid_eq(pfid, lf))
154                         GOTO(out, rc = 1);
155                 if (parent)
156                         mdd_object_put(env, parent);
157
158                 parent = mdd_object_find(env, mdd, pfid);
159                 if (IS_ERR(parent)) {
160                         GOTO(out, rc = PTR_ERR(parent));
161                 } else if (mdd_object_remote(parent)) {
162                         /*FIXME: Because of the restriction of rename in Phase I.
163                          * If the parent is remote, we just assumed lf is not the
164                          * parent of P1 for now */
165                         GOTO(out, rc = 0);
166                 }
167                 p1 = parent;
168         }
169         EXIT;
170 out:
171         if (parent && !IS_ERR(parent))
172                 mdd_object_put(env, parent);
173         return rc;
174 }
175
176 /*
177  * No permission check is needed.
178  *
179  * returns 1: if fid is ancestor of @mo;
180  * returns 0: if fid is not a ancestor of @mo;
181  *
182  * returns EREMOTE if remote object is found, fid of remote object is saved to
183  * @fid;
184  *
185  * returns < 0: if error
186  */
187 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
188                   const struct lu_fid *fid, struct lu_fid *sfid)
189 {
190         struct mdd_device *mdd = mdo2mdd(mo);
191         int rc;
192         ENTRY;
193
194         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
195                 RETURN(0);
196
197         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
198         if (rc == 0) {
199                 /* found root */
200                 fid_zero(sfid);
201         } else if (rc == 1) {
202                 /* found @fid is parent */
203                 *sfid = *fid;
204                 rc = 0;
205         }
206         RETURN(rc);
207 }
208
209 /*
210  * Check that @dir contains no entries except (possibly) dot and dotdot.
211  *
212  * Returns:
213  *
214  *             0        empty
215  *      -ENOTDIR        not a directory object
216  *    -ENOTEMPTY        not empty
217  *           -ve        other error
218  *
219  */
220 static int mdd_dir_is_empty(const struct lu_env *env,
221                             struct mdd_object *dir)
222 {
223         struct dt_it     *it;
224         struct dt_object *obj;
225         const struct dt_it_ops *iops;
226         int result;
227         ENTRY;
228
229         obj = mdd_object_child(dir);
230         if (!dt_try_as_dir(env, obj))
231                 RETURN(-ENOTDIR);
232
233         iops = &obj->do_index_ops->dio_it;
234         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
235         if (!IS_ERR(it)) {
236                 result = iops->get(env, it, (const void *)"");
237                 if (result > 0) {
238                         int i;
239                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
240                                 result = iops->next(env, it);
241                         if (result == 0)
242                                 result = -ENOTEMPTY;
243                         else if (result == +1)
244                                 result = 0;
245                 } else if (result == 0)
246                         /*
247                          * Huh? Index contains no zero key?
248                          */
249                         result = -EIO;
250
251                 iops->put(env, it);
252                 iops->fini(env, it);
253         } else
254                 result = PTR_ERR(it);
255         RETURN(result);
256 }
257
258 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
259 {
260         struct mdd_device *m = mdd_obj2mdd_dev(obj);
261         struct lu_attr *la = &mdd_env_info(env)->mti_la;
262         int rc;
263         ENTRY;
264
265         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
266         if (rc)
267                 RETURN(rc);
268
269         /*
270          * Subdir count limitation can be broken through.
271          */
272         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
273             !S_ISDIR(la->la_mode))
274                 RETURN(-EMLINK);
275         else
276                 RETURN(0);
277 }
278
279 /*
280  * Check whether it may create the cobj under the pobj.
281  * cobj maybe NULL
282  */
283 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
284                    struct mdd_object *cobj, int check_perm, int check_nlink)
285 {
286         int rc = 0;
287         ENTRY;
288
289         if (cobj && mdd_object_exists(cobj))
290                 RETURN(-EEXIST);
291
292         if (mdd_is_dead_obj(pobj))
293                 RETURN(-ENOENT);
294
295         if (check_perm)
296                 rc = mdd_permission_internal_locked(env, pobj, NULL,
297                                                     MAY_WRITE | MAY_EXEC,
298                                                     MOR_TGT_PARENT);
299         if (!rc && check_nlink)
300                 rc = __mdd_may_link(env, pobj);
301
302         RETURN(rc);
303 }
304
305 /*
306  * Check whether can unlink from the pobj in the case of "cobj == NULL".
307  */
308 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
309                    const struct lu_attr *attr)
310 {
311         int rc;
312         ENTRY;
313
314         if (mdd_is_dead_obj(pobj))
315                 RETURN(-ENOENT);
316
317         if ((attr->la_valid & LA_FLAGS) &&
318             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
319                 RETURN(-EPERM);
320
321         rc = mdd_permission_internal_locked(env, pobj, NULL,
322                                             MAY_WRITE | MAY_EXEC,
323                                             MOR_TGT_PARENT);
324         if (rc)
325                 RETURN(rc);
326
327         if (mdd_is_append(pobj))
328                 RETURN(-EPERM);
329
330         RETURN(rc);
331 }
332
333 /*
334  * pobj == NULL is remote ops case, under such case, pobj's
335  * VTX feature has been checked already, no need check again.
336  */
337 static inline int mdd_is_sticky(const struct lu_env *env,
338                                 struct mdd_object *pobj,
339                                 struct mdd_object *cobj)
340 {
341         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
342         struct lu_ucred *uc = lu_ucred_assert(env);
343         int rc;
344
345         if (pobj) {
346                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
347                 if (rc)
348                         return rc;
349
350                 if (!(tmp_la->la_mode & S_ISVTX) ||
351                     (tmp_la->la_uid == uc->uc_fsuid))
352                         return 0;
353         }
354
355         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
356         if (rc)
357                 return rc;
358
359         if (tmp_la->la_uid == uc->uc_fsuid)
360                 return 0;
361
362         return !md_capable(uc, CFS_CAP_FOWNER);
363 }
364
365 static int mdd_may_delete_entry(const struct lu_env *env,
366                                 struct mdd_object *pobj, int check_perm)
367 {
368         ENTRY;
369
370         LASSERT(pobj != NULL);
371         if (!mdd_object_exists(pobj))
372                 RETURN(-ENOENT);
373
374         if (mdd_is_dead_obj(pobj))
375                 RETURN(-ENOENT);
376
377         if (check_perm) {
378                 int rc;
379                 rc = mdd_permission_internal_locked(env, pobj, NULL,
380                                             MAY_WRITE | MAY_EXEC,
381                                             MOR_TGT_PARENT);
382                 if (rc)
383                         RETURN(rc);
384         }
385
386         if (mdd_is_append(pobj))
387                 RETURN(-EPERM);
388
389         RETURN(0);
390 }
391
392 /*
393  * Check whether it may delete the cobj from the pobj.
394  * pobj maybe NULL
395  */
396 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
397                    struct mdd_object *cobj, struct lu_attr *cattr,
398                    struct lu_attr *src_attr, int check_perm, int check_empty)
399 {
400         int rc = 0;
401         ENTRY;
402
403         if (pobj) {
404                 rc = mdd_may_delete_entry(env, pobj, check_perm);
405                 if (rc != 0)
406                         RETURN(rc);
407         }
408
409         if (cobj == NULL)
410                 RETURN(0);
411
412         if (!mdd_object_exists(cobj))
413                 RETURN(-ENOENT);
414
415         if (mdd_is_dead_obj(cobj))
416                 RETURN(-ESTALE);
417
418
419         if (mdd_is_sticky(env, pobj, cobj))
420                 RETURN(-EPERM);
421
422         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
423                 RETURN(-EPERM);
424
425         if ((cattr->la_valid & LA_FLAGS) &&
426             (cattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
427                 RETURN(-EPERM);
428
429         /* additional check the rename case */
430         if (src_attr) {
431                 if (S_ISDIR(src_attr->la_mode)) {
432                         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
433
434                         if (!S_ISDIR(cattr->la_mode))
435                                 RETURN(-ENOTDIR);
436
437                         if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
438                                 RETURN(-EBUSY);
439                 } else if (S_ISDIR(cattr->la_mode))
440                         RETURN(-EISDIR);
441         }
442
443         if (S_ISDIR(cattr->la_mode) && check_empty)
444                 rc = mdd_dir_is_empty(env, cobj);
445
446         RETURN(rc);
447 }
448
449 /*
450  * tgt maybe NULL
451  * has mdd_write_lock on src already, but not on tgt yet
452  */
453 int mdd_link_sanity_check(const struct lu_env *env,
454                           struct mdd_object *tgt_obj,
455                           const struct lu_name *lname,
456                           struct mdd_object *src_obj)
457 {
458         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
459         int rc = 0;
460         ENTRY;
461
462         if (!mdd_object_exists(src_obj))
463                 RETURN(-ENOENT);
464
465         if (mdd_is_dead_obj(src_obj))
466                 RETURN(-ESTALE);
467
468         /* Local ops, no lookup before link, check filename length here. */
469         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
470                 RETURN(-ENAMETOOLONG);
471
472         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
473                 RETURN(-EPERM);
474
475         if (S_ISDIR(mdd_object_type(src_obj)))
476                 RETURN(-EPERM);
477
478         LASSERT(src_obj != tgt_obj);
479         if (tgt_obj) {
480                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
481                 if (rc)
482                         RETURN(rc);
483         }
484
485         rc = __mdd_may_link(env, src_obj);
486
487         RETURN(rc);
488 }
489
490 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
491                                    const char *name, struct thandle *handle,
492                                    struct lustre_capa *capa)
493 {
494         struct dt_object *next = mdd_object_child(pobj);
495         int               rc;
496         ENTRY;
497
498         if (dt_try_as_dir(env, next)) {
499                 rc = next->do_index_ops->dio_delete(env, next,
500                                                     (struct dt_key *)name,
501                                                     handle, capa);
502         } else
503                 rc = -ENOTDIR;
504
505         RETURN(rc);
506 }
507
508 static int __mdd_index_insert_only(const struct lu_env *env,
509                                    struct mdd_object *pobj,
510                                    const struct lu_fid *lf, const char *name,
511                                    struct thandle *handle,
512                                    struct lustre_capa *capa)
513 {
514         struct dt_object *next = mdd_object_child(pobj);
515         int               rc;
516         ENTRY;
517
518         if (dt_try_as_dir(env, next)) {
519                 struct lu_ucred  *uc = lu_ucred_check(env);
520                 int ignore_quota;
521
522                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
523                 rc = next->do_index_ops->dio_insert(env, next,
524                                                     (struct dt_rec*)lf,
525                                                     (const struct dt_key *)name,
526                                                     handle, capa, ignore_quota);
527         } else {
528                 rc = -ENOTDIR;
529         }
530         RETURN(rc);
531 }
532
533 /* insert named index, add reference if isdir */
534 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
535                               const struct lu_fid *lf, const char *name, int is_dir,
536                               struct thandle *handle, struct lustre_capa *capa)
537 {
538         int               rc;
539         ENTRY;
540
541         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
542         if (rc == 0 && is_dir) {
543                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
544                 mdo_ref_add(env, pobj, handle);
545                 mdd_write_unlock(env, pobj);
546         }
547         RETURN(rc);
548 }
549
550 /* delete named index, drop reference if isdir */
551 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
552                               const char *name, int is_dir, struct thandle *handle,
553                               struct lustre_capa *capa)
554 {
555         int               rc;
556         ENTRY;
557
558         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
559         if (rc == 0 && is_dir) {
560                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
561                 mdo_ref_del(env, pobj, handle);
562                 mdd_write_unlock(env, pobj);
563         }
564
565         RETURN(rc);
566 }
567
568 int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
569                             int reclen, struct thandle *handle)
570 {
571         int rc;
572
573         /* XXX: this is a temporary solution to declare llog changes
574          *      will be fixed in 2.3 with new llog implementation */
575
576         LASSERT(mdd->mdd_capa);
577
578         /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
579          *      have to set the parameter 'size' as INT_MAX or 0 to inform
580          *      OSD that this record write is for a llog write or catalog
581          *      header update, and osd declare function will reserve less
582          *      credits for optimization purpose.
583          *
584          *      Reserve 6 blocks for a llog write, since the llog file is
585          *      usually small, reserve 2 blocks for catalog header update,
586          *      because we know for sure that catalog header is already
587          *      allocated.
588          *
589          *      This hack should be removed in 2.3.
590          */
591
592         /* record itself */
593         rc = dt_declare_record_write(env, mdd->mdd_capa,
594                                      DECLARE_LLOG_WRITE, 0, handle);
595         if (rc)
596                 return rc;
597
598         /* header will be updated as well */
599         rc = dt_declare_record_write(env, mdd->mdd_capa,
600                                      DECLARE_LLOG_WRITE, 0, handle);
601         if (rc)
602                 return rc;
603
604         /* also we should be able to create new plain log */
605         rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
606         if (rc)
607                 return rc;
608
609         /* new record referencing new plain llog */
610         rc = dt_declare_record_write(env, mdd->mdd_capa,
611                                      DECLARE_LLOG_WRITE, 0, handle);
612         if (rc)
613                 return rc;
614
615         /* catalog's header will be updated as well */
616         rc = dt_declare_record_write(env, mdd->mdd_capa,
617                                      DECLARE_LLOG_REWRITE, 0, handle);
618
619         return rc;
620 }
621
622 int mdd_declare_changelog_store(const struct lu_env *env,
623                                 struct mdd_device *mdd,
624                                 const struct lu_name *fname,
625                                 struct thandle *handle)
626 {
627         struct obd_device               *obd = mdd2obd_dev(mdd);
628         struct llog_ctxt                *ctxt;
629         struct llog_changelog_rec       *rec;
630         struct lu_buf                   *buf;
631         int                              reclen;
632         int                              rc;
633
634         /* Not recording */
635         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
636                 return 0;
637
638         reclen = llog_data_len(sizeof(*rec) +
639                                (fname != NULL ? fname->ln_namelen : 0));
640         buf = mdd_buf_alloc(env, reclen);
641         if (buf->lb_buf == NULL)
642                 return -ENOMEM;
643
644         rec = buf->lb_buf;
645         rec->cr_hdr.lrh_len = reclen;
646         rec->cr_hdr.lrh_type = CHANGELOG_REC;
647
648         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
649         if (ctxt == NULL)
650                 return -ENXIO;
651
652         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
653         llog_ctxt_put(ctxt);
654
655         return rc;
656 }
657
658 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
659                                            struct mdd_device *mdd,
660                                            const struct lu_name *tname,
661                                            const struct lu_name *sname,
662                                            struct thandle *handle)
663 {
664         struct obd_device               *obd = mdd2obd_dev(mdd);
665         struct llog_ctxt                *ctxt;
666         struct llog_changelog_ext_rec   *rec;
667         struct lu_buf                   *buf;
668         int                              reclen;
669         int                              rc;
670
671         /* Not recording */
672         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
673                 return 0;
674
675         reclen = llog_data_len(sizeof(*rec) +
676                                (tname != NULL ? tname->ln_namelen : 0) +
677                                (sname != NULL ? 1 + sname->ln_namelen : 0));
678         buf = mdd_buf_alloc(env, reclen);
679         if (buf->lb_buf == NULL)
680                 return -ENOMEM;
681
682         rec = buf->lb_buf;
683         rec->cr_hdr.lrh_len = reclen;
684         rec->cr_hdr.lrh_type = CHANGELOG_REC;
685
686         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
687         if (ctxt == NULL)
688                 return -ENXIO;
689
690         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
691         llog_ctxt_put(ctxt);
692
693         return rc;
694 }
695
696 /** Add a changelog entry \a rec to the changelog llog
697  * \param mdd
698  * \param rec
699  * \param handle - currently ignored since llogs start their own transaction;
700  *                 this will hopefully be fixed in llog rewrite
701  * \retval 0 ok
702  */
703 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
704                         struct llog_changelog_rec *rec, struct thandle *th)
705 {
706         struct obd_device       *obd = mdd2obd_dev(mdd);
707         struct llog_ctxt        *ctxt;
708         int                      rc;
709
710         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
711         rec->cr_hdr.lrh_type = CHANGELOG_REC;
712         rec->cr.cr_time = cl_time();
713
714         spin_lock(&mdd->mdd_cl.mc_lock);
715         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
716          * but as long as the MDD transactions are ordered correctly for e.g.
717          * rename conflicts, I don't think this should matter. */
718         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
719         spin_unlock(&mdd->mdd_cl.mc_lock);
720
721         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
722         if (ctxt == NULL)
723                 return -ENXIO;
724
725         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
726         llog_ctxt_put(ctxt);
727         if (rc > 0)
728                 rc = 0;
729         return rc;
730 }
731
732 /** Add a changelog_ext entry \a rec to the changelog llog
733  * \param mdd
734  * \param rec
735  * \param handle - currently ignored since llogs start their own transaction;
736  *              this will hopefully be fixed in llog rewrite
737  * \retval 0 ok
738  */
739 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
740                             struct llog_changelog_ext_rec *rec,
741                             struct thandle *th)
742 {
743         struct obd_device       *obd = mdd2obd_dev(mdd);
744         struct llog_ctxt        *ctxt;
745         int                      rc;
746
747         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
748         /* llog_lvfs_write_rec sets the llog tail len */
749         rec->cr_hdr.lrh_type = CHANGELOG_REC;
750         rec->cr.cr_time = cl_time();
751
752         spin_lock(&mdd->mdd_cl.mc_lock);
753         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
754          * but as long as the MDD transactions are ordered correctly for e.g.
755          * rename conflicts, I don't think this should matter. */
756         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
757         spin_unlock(&mdd->mdd_cl.mc_lock);
758
759         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
760         if (ctxt == NULL)
761                 return -ENXIO;
762
763         /* nested journal transaction */
764         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
765         llog_ctxt_put(ctxt);
766         if (rc > 0)
767                 rc = 0;
768
769         return rc;
770 }
771
772 /** Store a namespace change changelog record
773  * If this fails, we must fail the whole transaction; we don't
774  * want the change to commit without the log entry.
775  * \param target - mdd_object of change
776  * \param parent - parent dir/object
777  * \param tname - target name string
778  * \param handle - transacion handle
779  */
780 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
781                            enum changelog_rec_type type, unsigned flags,
782                            struct mdd_object *target, struct mdd_object *parent,
783                            const struct lu_name *tname, struct thandle *handle)
784 {
785         struct llog_changelog_rec *rec;
786         struct lu_buf *buf;
787         int reclen;
788         int rc;
789         ENTRY;
790
791         /* Not recording */
792         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
793                 RETURN(0);
794         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
795                 RETURN(0);
796
797         LASSERT(target != NULL);
798         LASSERT(parent != NULL);
799         LASSERT(tname != NULL);
800         LASSERT(handle != NULL);
801
802         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
803         buf = mdd_buf_alloc(env, reclen);
804         if (buf->lb_buf == NULL)
805                 RETURN(-ENOMEM);
806         rec = buf->lb_buf;
807
808         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
809         rec->cr.cr_type = (__u32)type;
810         rec->cr.cr_tfid = *mdo2fid(target);
811         rec->cr.cr_pfid = *mdo2fid(parent);
812         rec->cr.cr_namelen = tname->ln_namelen;
813         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
814
815         target->mod_cltime = cfs_time_current_64();
816
817         rc = mdd_changelog_store(env, mdd, rec, handle);
818         if (rc < 0) {
819                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
820                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
821                         PFID(&rec->cr.cr_pfid));
822                 RETURN(-EFAULT);
823         }
824
825         RETURN(0);
826 }
827
828
829 /** Store a namespace change changelog record
830  * If this fails, we must fail the whole transaction; we don't
831  * want the change to commit without the log entry.
832  * \param target - mdd_object of change
833  * \param tpfid - target parent dir/object fid
834  * \param sfid - source object fid
835  * \param spfid - source parent fid
836  * \param tname - target name string
837  * \param sname - source name string
838  * \param handle - transacion handle
839  */
840 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
841                                       struct mdd_device    *mdd,
842                                       enum changelog_rec_type type,
843                                       unsigned flags,
844                                       struct mdd_object    *target,
845                                       const struct lu_fid  *tpfid,
846                                       const struct lu_fid  *sfid,
847                                       const struct lu_fid  *spfid,
848                                       const struct lu_name *tname,
849                                       const struct lu_name *sname,
850                                       struct thandle *handle)
851 {
852         struct llog_changelog_ext_rec *rec;
853         struct lu_buf *buf;
854         int reclen;
855         int rc;
856         ENTRY;
857
858         /* Not recording */
859         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
860                 RETURN(0);
861         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
862                 RETURN(0);
863
864         LASSERT(sfid != NULL);
865         LASSERT(tpfid != NULL);
866         LASSERT(tname != NULL);
867         LASSERT(handle != NULL);
868
869         reclen = llog_data_len(sizeof(*rec) +
870                                sname != NULL ? 1 + sname->ln_namelen : 0);
871         buf = mdd_buf_alloc(env, reclen);
872         if (buf->lb_buf == NULL)
873                 RETURN(-ENOMEM);
874         rec = buf->lb_buf;
875
876         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
877         rec->cr.cr_type = (__u32)type;
878         rec->cr.cr_pfid = *tpfid;
879         rec->cr.cr_sfid = *sfid;
880         rec->cr.cr_spfid = *spfid;
881         rec->cr.cr_namelen = tname->ln_namelen;
882         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
883         if (sname) {
884                 rec->cr.cr_name[tname->ln_namelen] = '\0';
885                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
886                         sname->ln_namelen);
887                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
888         }
889
890         if (likely(target != NULL)) {
891                 rec->cr.cr_tfid = *mdo2fid(target);
892                 target->mod_cltime = cfs_time_current_64();
893         } else {
894                 fid_zero(&rec->cr.cr_tfid);
895         }
896
897         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
898         if (rc < 0) {
899                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
900                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
901                 return -EFAULT;
902         }
903
904         return 0;
905 }
906
907 static int mdd_declare_link(const struct lu_env *env,
908                             struct mdd_device *mdd,
909                             struct mdd_object *p,
910                             struct mdd_object *c,
911                             const struct lu_name *name,
912                             struct thandle *handle)
913 {
914         int rc;
915
916         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
917         if (rc)
918                 return rc;
919
920         rc = mdo_declare_ref_add(env, c, handle);
921         if (rc)
922                 return rc;
923
924         rc = mdo_declare_attr_set(env, p, NULL, handle);
925         if (rc)
926                 return rc;
927
928         rc = mdo_declare_attr_set(env, c, NULL, handle);
929         if (rc)
930                 return rc;
931
932         rc = mdd_declare_links_add(env, c, handle);
933         if (rc)
934                 return rc;
935
936         rc = mdd_declare_changelog_store(env, mdd, name, handle);
937
938         return rc;
939 }
940
941 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
942                     struct md_object *src_obj, const struct lu_name *lname,
943                     struct md_attr *ma)
944 {
945         const char *name = lname->ln_name;
946         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
947         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
948         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
949         struct mdd_device *mdd = mdo2mdd(src_obj);
950         struct dynlock_handle *dlh;
951         struct thandle *handle;
952         int rc;
953         ENTRY;
954
955         handle = mdd_trans_create(env, mdd);
956         if (IS_ERR(handle))
957                 GOTO(out_pending, rc = PTR_ERR(handle));
958
959         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
960         if (rc)
961                 GOTO(stop, rc);
962
963         rc = mdd_trans_start(env, mdd, handle);
964         if (rc)
965                 GOTO(stop, rc);
966
967         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
968         if (dlh == NULL)
969                 GOTO(out_trans, rc = -ENOMEM);
970         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
971
972         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
973         if (rc)
974                 GOTO(out_unlock, rc);
975
976         rc = mdo_ref_add(env, mdd_sobj, handle);
977         if (rc)
978                 GOTO(out_unlock, rc);
979
980
981         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
982                                      name, handle,
983                                      mdd_object_capa(env, mdd_tobj));
984         if (rc != 0) {
985                 mdo_ref_del(env, mdd_sobj, handle);
986                 GOTO(out_unlock, rc);
987         }
988
989         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
990         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
991
992         la->la_valid = LA_CTIME | LA_MTIME;
993         rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
994         if (rc)
995                 GOTO(out_unlock, rc);
996
997         la->la_valid = LA_CTIME;
998         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
999         if (rc == 0) {
1000                 mdd_links_add(env, mdd_sobj,
1001                               mdo2fid(mdd_tobj), lname, handle, 0);
1002         }
1003
1004         EXIT;
1005 out_unlock:
1006         mdd_write_unlock(env, mdd_sobj);
1007         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
1008 out_trans:
1009         if (rc == 0)
1010                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1011                                             mdd_tobj, lname, handle);
1012 stop:
1013         mdd_trans_stop(env, mdd, rc, handle);
1014 out_pending:
1015         return rc;
1016 }
1017
1018 int mdd_declare_finish_unlink(const struct lu_env *env,
1019                               struct mdd_object *obj,
1020                               struct md_attr *ma,
1021                               struct thandle *handle)
1022 {
1023         int     rc;
1024
1025         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1026         if (rc)
1027                 return rc;
1028
1029         return mdo_declare_destroy(env, obj, handle);
1030 }
1031
1032 /* caller should take a lock before calling */
1033 int mdd_finish_unlink(const struct lu_env *env,
1034                       struct mdd_object *obj, struct md_attr *ma,
1035                       struct thandle *th)
1036 {
1037         int rc = 0;
1038         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1039         ENTRY;
1040
1041         LASSERT(mdd_write_locked(env, obj) != 0);
1042
1043         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
1044                 obj->mod_flags |= DEAD_OBJ;
1045                 /* add new orphan and the object
1046                  * will be deleted during mdd_close() */
1047                 if (obj->mod_count) {
1048                         rc = __mdd_orphan_add(env, obj, th);
1049                         if (rc == 0)
1050                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1051                                         "orphan list, open count = %d\n",
1052                                         PFID(mdd_object_fid(obj)),
1053                                         obj->mod_count);
1054                         else
1055                                 CERROR("Object "DFID" fail to be an orphan, "
1056                                        "open count = %d, maybe cause failed "
1057                                        "open replay\n",
1058                                         PFID(mdd_object_fid(obj)),
1059                                         obj->mod_count);
1060                 } else {
1061                         rc = mdo_destroy(env, obj, th);
1062                 }
1063         }
1064
1065         RETURN(rc);
1066 }
1067
1068 /*
1069  * pobj maybe NULL
1070  * has mdd_write_lock on cobj already, but not on pobj yet
1071  */
1072 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1073                             struct mdd_object *cobj, struct lu_attr *cattr)
1074 {
1075         int rc;
1076         ENTRY;
1077
1078         rc = mdd_may_delete(env, pobj, cobj, cattr, NULL, 1, 1);
1079
1080         RETURN(rc);
1081 }
1082
1083 static inline int mdd_declare_links_del(const struct lu_env *env,
1084                                         struct mdd_object *c,
1085                                         struct thandle *handle)
1086 {
1087         int rc = 0;
1088
1089         /* For directory, the linkEA will be removed together with the object. */
1090         if (!S_ISDIR(mdd_object_type(c)))
1091                 rc = mdd_declare_links_add(env, c, handle);
1092
1093         return rc;
1094 }
1095
1096 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1097                               struct mdd_object *p, struct mdd_object *c,
1098                               const struct lu_name *name, struct md_attr *ma,
1099                               struct thandle *handle, int no_name)
1100 {
1101         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1102         int rc;
1103
1104         if (likely(no_name == 0)) {
1105                 rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1106                 if (rc)
1107                         return rc;
1108         }
1109
1110         rc = mdo_declare_ref_del(env, p, handle);
1111         if (rc)
1112                 return rc;
1113
1114         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1115         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1116         la->la_valid = LA_CTIME | LA_MTIME;
1117         rc = mdo_declare_attr_set(env, p, la, handle);
1118         if (rc)
1119                 return rc;
1120
1121         if (c != NULL) {
1122                 rc = mdo_declare_ref_del(env, c, handle);
1123                 if (rc)
1124                         return rc;
1125
1126                 rc = mdo_declare_ref_del(env, c, handle);
1127                 if (rc)
1128                         return rc;
1129
1130                 rc = mdo_declare_attr_set(env, c, NULL, handle);
1131                 if (rc)
1132                         return rc;
1133
1134                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1135                 if (rc)
1136                         return rc;
1137
1138                 rc = mdd_declare_links_del(env, c, handle);
1139                 if (rc != 0)
1140                         return rc;
1141
1142                 /* FIXME: need changelog for remove entry */
1143                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1144         }
1145
1146         return rc;
1147 }
1148
1149 /**
1150  * Delete name entry and the object.
1151  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1152  * does not exist for this object, and it could only happen during resending
1153  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1154  * is also needed in this case(needed by changelog), so we have to add another
1155  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1156  * the ENOENT failure should be able to be fixed by redo mechanism.
1157  */
1158 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1159                       struct md_object *cobj, const struct lu_name *lname,
1160                       struct md_attr *ma, int no_name)
1161 {
1162         const char *name = lname->ln_name;
1163         struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
1164         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1165         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1166         struct mdd_object *mdd_cobj = NULL;
1167         struct mdd_device *mdd = mdo2mdd(pobj);
1168         struct dynlock_handle *dlh;
1169         struct thandle    *handle;
1170         int rc, is_dir = 0;
1171         ENTRY;
1172
1173         /* cobj == NULL means only delete name entry */
1174         if (likely(cobj != NULL)) {
1175                 mdd_cobj = md2mdd_obj(cobj);
1176                 if (mdd_object_exists(mdd_cobj) == 0)
1177                         RETURN(-ENOENT);
1178                 /* currently it is assume, it could only delete
1179                  * name entry of remote directory */
1180                 is_dir = 1;
1181         }
1182
1183         handle = mdd_trans_create(env, mdd);
1184         if (IS_ERR(handle))
1185                 RETURN(PTR_ERR(handle));
1186
1187         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1188                                 lname, ma, handle, no_name);
1189         if (rc)
1190                 GOTO(stop, rc);
1191
1192         rc = mdd_trans_start(env, mdd, handle);
1193         if (rc)
1194                 GOTO(stop, rc);
1195
1196         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1197         if (dlh == NULL)
1198                 GOTO(stop, rc = -ENOMEM);
1199
1200         if (likely(mdd_cobj != NULL)) {
1201                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1202
1203                 /* fetch cattr */
1204                 rc = mdd_la_get(env, mdd_cobj, cattr,
1205                                 mdd_object_capa(env, mdd_cobj));
1206                 if (rc)
1207                         GOTO(cleanup, rc);
1208
1209                 is_dir = S_ISDIR(cattr->la_mode);
1210
1211         }
1212
1213         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
1214         if (rc)
1215                 GOTO(cleanup, rc);
1216
1217         if (likely(no_name == 0)) {
1218                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1219                                         mdd_object_capa(env, mdd_pobj));
1220                 if (rc)
1221                         GOTO(cleanup, rc);
1222         }
1223
1224         if (likely(mdd_cobj != NULL)) {
1225                 rc = mdo_ref_del(env, mdd_cobj, handle);
1226                 if (rc != 0) {
1227                         __mdd_index_insert_only(env, mdd_pobj,
1228                                                 mdo2fid(mdd_cobj),
1229                                                 name, handle,
1230                                                 mdd_object_capa(env, mdd_pobj));
1231                         GOTO(cleanup, rc);
1232                 }
1233
1234                 if (is_dir)
1235                         /* unlink dot */
1236                         mdo_ref_del(env, mdd_cobj, handle);
1237
1238                 /* fetch updated nlink */
1239                 rc = mdd_la_get(env, mdd_cobj, cattr,
1240                                 mdd_object_capa(env, mdd_cobj));
1241                 if (rc)
1242                         GOTO(cleanup, rc);
1243         }
1244
1245         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1246         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1247
1248         la->la_valid = LA_CTIME | LA_MTIME;
1249         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1250         if (rc)
1251                 GOTO(cleanup, rc);
1252
1253         /* Enough for only unlink the entry */
1254         if (unlikely(mdd_cobj == NULL)) {
1255                 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1256                 GOTO(stop, rc);
1257         }
1258
1259         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1260                 /* update ctime of an unlinked file only if it is still
1261                  * opened or a link still exists */
1262                 la->la_valid = LA_CTIME;
1263                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1264                 if (rc)
1265                         GOTO(cleanup, rc);
1266         }
1267
1268         /* XXX: this transfer to ma will be removed with LOD/OSP */
1269         ma->ma_attr = *cattr;
1270         ma->ma_valid |= MA_INODE;
1271         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1272
1273         /* fetch updated nlink */
1274         if (rc == 0)
1275                 rc = mdd_la_get(env, mdd_cobj, cattr,
1276                                 mdd_object_capa(env, mdd_cobj));
1277
1278         if (!is_dir)
1279                 /* old files may not have link ea; ignore errors */
1280                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1281
1282         /* if object is removed then we can't get its attrs, use last get */
1283         if (cattr->la_nlink == 0) {
1284                 ma->ma_attr = *cattr;
1285                 ma->ma_valid |= MA_INODE;
1286         }
1287         EXIT;
1288 cleanup:
1289         mdd_write_unlock(env, mdd_cobj);
1290         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1291         if (rc == 0) {
1292                 int cl_flags;
1293
1294                 cl_flags = (cattr->la_nlink == 0) ? CLF_UNLINK_LAST : 0;
1295                 if ((ma->ma_valid & MA_HSM) &&
1296                     (ma->ma_hsm.mh_flags & HS_EXISTS))
1297                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1298
1299                 rc = mdd_changelog_ns_store(env, mdd,
1300                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1301                         mdd_cobj, mdd_pobj, lname, handle);
1302         }
1303
1304 stop:
1305         mdd_trans_stop(env, mdd, rc, handle);
1306
1307         return rc;
1308 }
1309
1310 /*
1311  * The permission has been checked when obj created, no need check again.
1312  */
1313 static int mdd_cd_sanity_check(const struct lu_env *env,
1314                                struct mdd_object *obj)
1315 {
1316         ENTRY;
1317
1318         /* EEXIST check */
1319         if (!obj || mdd_is_dead_obj(obj))
1320                 RETURN(-ENOENT);
1321
1322         RETURN(0);
1323
1324 }
1325
1326 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1327                            struct md_object *cobj, const struct md_op_spec *spec,
1328                            struct md_attr *ma)
1329 {
1330         struct mdd_device *mdd = mdo2mdd(cobj);
1331         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1332         struct mdd_object *son = md2mdd_obj(cobj);
1333         struct thandle    *handle;
1334         const struct lu_buf *buf;
1335         struct lu_attr    *attr = &mdd_env_info(env)->mti_cattr;
1336         int                rc;
1337         ENTRY;
1338
1339         /* do not let users to create stripes via .lustre/
1340          * mdd_obf_setup() sets IMMUTE_OBJ on this directory */
1341         if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ)
1342                 RETURN(-ENOENT);
1343
1344         rc = mdd_cd_sanity_check(env, son);
1345         if (rc)
1346                 RETURN(rc);
1347
1348         if (!md_should_create(spec->sp_cr_flags))
1349                 RETURN(0);
1350
1351         /*
1352          * there are following use cases for this function:
1353          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1354          *    striping can be specified or not
1355          * 2) CMD?
1356          */
1357         rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
1358         if (rc)
1359                 RETURN(rc);
1360
1361         /* calling ->ah_make_hint() is used to transfer information from parent */
1362         mdd_object_make_hint(env, mdd_pobj, son, attr);
1363
1364         handle = mdd_trans_create(env, mdd);
1365         if (IS_ERR(handle))
1366                 GOTO(out_free, rc = PTR_ERR(handle));
1367
1368         /*
1369          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1370          * Should this be fixed?
1371          */
1372         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
1373                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1374                spec->sp_cr_flags, spec->no_create);
1375
1376         if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
1377                 /* replay case or lfs setstripe */
1378                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1379                                         spec->u.sp_ea.eadatalen);
1380         } else {
1381                 buf = &LU_BUF_NULL;
1382         }
1383
1384         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1385                                   XATTR_NAME_LOV, 0, handle);
1386         if (rc)
1387                 GOTO(stop, rc);
1388
1389         rc = mdd_trans_start(env, mdd, handle);
1390         if (rc)
1391                 GOTO(stop, rc);
1392
1393         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1394                           0, handle, mdd_object_capa(env, son));
1395 stop:
1396         mdd_trans_stop(env, mdd, rc, handle);
1397 out_free:
1398         RETURN(rc);
1399 }
1400
1401 /* Get fid from name and parent */
1402 static int
1403 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1404              const struct lu_name *lname, struct lu_fid* fid, int mask)
1405 {
1406         const char          *name = lname->ln_name;
1407         const struct dt_key *key = (const struct dt_key *)name;
1408         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1409         struct mdd_device   *m = mdo2mdd(pobj);
1410         struct dt_object    *dir = mdd_object_child(mdd_obj);
1411         int rc;
1412         ENTRY;
1413
1414         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1415                 RETURN(-ESTALE);
1416
1417         if (mdd_object_remote(mdd_obj)) {
1418                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
1419                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
1420         } else if (!mdd_object_exists(mdd_obj)) {
1421                 RETURN(-ESTALE);
1422         }
1423
1424         /* The common filename length check. */
1425         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1426                 RETURN(-ENAMETOOLONG);
1427
1428         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1429                                             MOR_TGT_PARENT);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1434                    dt_try_as_dir(env, dir))) {
1435
1436                 rc = dir->do_index_ops->dio_lookup(env, dir,
1437                                                  (struct dt_rec *)fid, key,
1438                                                  mdd_object_capa(env, mdd_obj));
1439                 if (rc > 0)
1440                         rc = 0;
1441                 else if (rc == 0)
1442                         rc = -ENOENT;
1443         } else
1444                 rc = -ENOTDIR;
1445
1446         RETURN(rc);
1447 }
1448
1449 static int mdd_declare_object_initialize(const struct lu_env *env,
1450                                          struct mdd_object *parent,
1451                                          struct mdd_object *child,
1452                                          struct lu_attr *attr,
1453                                          struct thandle *handle)
1454 {
1455         int rc;
1456         ENTRY;
1457
1458         /*
1459          * inode mode has been set in creation time, and it's based on umask,
1460          * la_mode and acl, don't set here again! (which will go wrong
1461          * because below function doesn't consider umask).
1462          * I'd suggest set all object attributes in creation time, see above.
1463          */
1464         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1465         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1466         rc = mdo_declare_attr_set(env, child, attr, handle);
1467         attr->la_valid |= LA_MODE | LA_TYPE;
1468         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1469                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1470                                               dot, handle);
1471                 if (rc == 0)
1472                         rc = mdo_declare_ref_add(env, child, handle);
1473
1474                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1475                                               dotdot, handle);
1476         }
1477
1478         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1479                         fid_is_dot_lustre(mdo2fid(child)) ||
1480                         fid_is_root(mdo2fid(child))))
1481                 mdd_declare_links_add(env, child, handle);
1482
1483         RETURN(rc);
1484 }
1485
1486 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
1487                           const struct lu_name *lname, struct mdd_object *child,
1488                           const struct lu_attr *attr, struct thandle *handle,
1489                           const struct md_op_spec *spec)
1490 {
1491         int rc;
1492         ENTRY;
1493
1494         /*
1495          * Update attributes for child.
1496          *
1497          * FIXME:
1498          *  (1) the valid bits should be converted between Lustre and Linux;
1499          *  (2) maybe, the child attributes should be set in OSD when creation.
1500          */
1501
1502         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1503         /* arguments are supposed to stay the same */
1504         if (S_ISDIR(attr->la_mode)) {
1505                 /* Add "." and ".." for newly created dir */
1506                 mdo_ref_add(env, child, handle);
1507                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1508                                              dot, handle, BYPASS_CAPA);
1509                 if (rc == 0)
1510                         rc = __mdd_index_insert_only(env, child, pfid,
1511                                                      dotdot, handle,
1512                                                      BYPASS_CAPA);
1513                 if (rc != 0)
1514                         mdo_ref_del(env, child, handle);
1515         }
1516
1517         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1518                         fid_is_dot_lustre(mdo2fid(child)) ||
1519                         fid_is_root(mdo2fid(child))))
1520                 mdd_links_add(env, child, pfid, lname, handle, 1);
1521
1522         RETURN(rc);
1523 }
1524
1525 /* has not lock on pobj yet */
1526 static int mdd_create_sanity_check(const struct lu_env *env,
1527                                    struct md_object *pobj,
1528                                    struct lu_attr *pattr,
1529                                    const struct lu_name *lname,
1530                                    struct lu_attr *cattr,
1531                                    struct md_op_spec *spec)
1532 {
1533         struct mdd_thread_info *info = mdd_env_info(env);
1534         struct lu_fid     *fid       = &info->mti_fid;
1535         struct mdd_object *obj       = md2mdd_obj(pobj);
1536         struct mdd_device *m         = mdo2mdd(pobj);
1537         int rc;
1538         ENTRY;
1539
1540         /* EEXIST check */
1541         if (mdd_is_dead_obj(obj))
1542                 RETURN(-ENOENT);
1543
1544         /*
1545          * In some cases this lookup is not needed - we know before if name
1546          * exists or not because MDT performs lookup for it.
1547          * name length check is done in lookup.
1548          */
1549         if (spec->sp_cr_lookup) {
1550                 /*
1551                  * Check if the name already exist, though it will be checked in
1552                  * _index_insert also, for avoiding rolling back if exists
1553                  * _index_insert.
1554                  */
1555                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1556                                          MAY_WRITE | MAY_EXEC);
1557                 if (rc != -ENOENT)
1558                         RETURN(rc ? : -EEXIST);
1559         } else {
1560                 /*
1561                  * Check WRITE permission for the parent.
1562                  * EXEC permission have been checked
1563                  * when lookup before create already.
1564                  */
1565                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1566                                                     MOR_TGT_PARENT);
1567                 if (rc)
1568                         RETURN(rc);
1569         }
1570
1571         /* sgid check */
1572         if (pattr->la_mode & S_ISGID) {
1573                 cattr->la_gid = pattr->la_gid;
1574                 if (S_ISDIR(cattr->la_mode)) {
1575                         cattr->la_mode |= S_ISGID;
1576                         cattr->la_valid |= LA_MODE;
1577                 }
1578         }
1579
1580         switch (cattr->la_mode & S_IFMT) {
1581         case S_IFLNK: {
1582                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1583
1584                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1585                         RETURN(-ENAMETOOLONG);
1586                 else
1587                         RETURN(0);
1588         }
1589         case S_IFDIR:
1590         case S_IFREG:
1591         case S_IFCHR:
1592         case S_IFBLK:
1593         case S_IFIFO:
1594         case S_IFSOCK:
1595                 rc = 0;
1596                 break;
1597         default:
1598                 rc = -EINVAL;
1599                 break;
1600         }
1601         RETURN(rc);
1602 }
1603
1604 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1605                               struct mdd_object *p, struct mdd_object *c,
1606                               const struct lu_name *name,
1607                               struct lu_attr *attr,
1608                               int got_def_acl,
1609                               struct thandle *handle,
1610                               const struct md_op_spec *spec)
1611 {
1612         int rc;
1613
1614         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1615         if (rc)
1616                 GOTO(out, rc);
1617
1618 #ifdef CONFIG_FS_POSIX_ACL
1619         if (got_def_acl > 0) {
1620                 struct lu_buf *acl_buf;
1621
1622                 acl_buf = mdd_buf_get(env, NULL, got_def_acl);
1623                 /* if dir, then can inherit default ACl */
1624                 if (S_ISDIR(attr->la_mode)) {
1625                         rc = mdo_declare_xattr_set(env, c, acl_buf,
1626                                                    XATTR_NAME_ACL_DEFAULT,
1627                                                    0, handle);
1628                         if (rc)
1629                                 GOTO(out, rc);
1630                 }
1631
1632                 rc = mdo_declare_attr_set(env, c, attr, handle);
1633                 if (rc)
1634                         GOTO(out, rc);
1635
1636                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1637                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1638                 if (rc)
1639                         GOTO(out, rc);
1640         }
1641 #endif
1642
1643         if (S_ISDIR(attr->la_mode)) {
1644                 rc = mdo_declare_ref_add(env, p, handle);
1645                 if (rc)
1646                         GOTO(out, rc);
1647         }
1648
1649         rc = mdd_declare_object_initialize(env, p, c, attr, handle);
1650         if (rc)
1651                 GOTO(out, rc);
1652
1653         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1654                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1655         else
1656                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1657                                               name->ln_name, handle);
1658         if (rc)
1659                 GOTO(out, rc);
1660
1661         /* replay case, create LOV EA from client data */
1662         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1663                 const struct lu_buf *buf;
1664
1665                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1666                                         spec->u.sp_ea.eadatalen);
1667                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1668                                            0, handle);
1669                 if (rc)
1670                         GOTO(out, rc);
1671         }
1672
1673         if (S_ISLNK(attr->la_mode)) {
1674                 rc = dt_declare_record_write(env, mdd_object_child(c),
1675                                              strlen(spec->u.sp_symname), 0,
1676                                              handle);
1677                 if (rc)
1678                         GOTO(out, rc);
1679         }
1680
1681         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1682                 rc = mdo_declare_attr_set(env, p, attr, handle);
1683                 if (rc)
1684                         return rc;
1685         }
1686
1687         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1688         if (rc)
1689                 return rc;
1690
1691 out:
1692         return rc;
1693 }
1694
1695 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1696                         struct lu_attr *la, struct lu_buf *acl_buf,
1697                         int *got_def_acl, int *reset_acl)
1698 {
1699         int     rc;
1700         ENTRY;
1701
1702         if (S_ISLNK(la->la_mode))
1703                 RETURN(0);
1704
1705         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
1706         rc = mdo_xattr_get(env, pobj, acl_buf,
1707                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
1708         mdd_read_unlock(env, pobj);
1709         if (rc > 0) {
1710                 /* If there are default ACL, fix mode by default ACL */
1711                 *got_def_acl = rc;
1712                 acl_buf->lb_len = rc;
1713                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
1714                 if (rc < 0)
1715                         RETURN(rc);
1716                 *reset_acl = rc;
1717         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
1718                 /* If there are no default ACL, fix mode by mask */
1719                 struct lu_ucred *uc = lu_ucred(env);
1720                 la->la_mode &= ~uc->uc_umask;
1721                 rc = 0;
1722         }
1723
1724         RETURN(rc);
1725 }
1726
1727 /*
1728  * Create object and insert it into namespace.
1729  */
1730 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
1731                       const struct lu_name *lname, struct md_object *child,
1732                       struct md_op_spec *spec, struct md_attr* ma)
1733 {
1734         struct mdd_thread_info  *info = mdd_env_info(env);
1735         struct lu_attr          *la = &info->mti_la_for_fix;
1736         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
1737         struct mdd_object       *son = md2mdd_obj(child);
1738         struct mdd_device       *mdd = mdo2mdd(pobj);
1739         struct lu_attr          *attr = &ma->ma_attr;
1740         struct thandle          *handle;
1741         struct lu_attr          *pattr = &info->mti_pattr;
1742         struct lu_buf           acl_buf;
1743         struct dynlock_handle   *dlh;
1744         const char              *name = lname->ln_name;
1745         int                      rc, created = 0, initialized = 0, inserted = 0;
1746         int                      got_def_acl = 0;
1747         int                      reset_acl = 0;
1748         ENTRY;
1749
1750         /*
1751          * Two operations have to be performed:
1752          *
1753          *  - an allocation of a new object (->do_create()), and
1754          *
1755          *  - an insertion into a parent index (->dio_insert()).
1756          *
1757          * Due to locking, operation order is not important, when both are
1758          * successful, *but* error handling cases are quite different:
1759          *
1760          *  - if insertion is done first, and following object creation fails,
1761          *  insertion has to be rolled back, but this operation might fail
1762          *  also leaving us with dangling index entry.
1763          *
1764          *  - if creation is done first, is has to be undone if insertion
1765          *  fails, leaving us with leaked space, which is neither good, nor
1766          *  fatal.
1767          *
1768          * It seems that creation-first is simplest solution, but it is
1769          * sub-optimal in the frequent
1770          *
1771          *         $ mkdir foo
1772          *         $ mkdir foo
1773          *
1774          * case, because second mkdir is bound to create object, only to
1775          * destroy it immediately.
1776          *
1777          * To avoid this follow local file systems that do double lookup:
1778          *
1779          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1780          *
1781          *     1. create            (mdd_object_create_internal())
1782          *
1783          *     2. insert            (__mdd_index_insert(), lookup again)
1784          */
1785
1786         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
1787         if (rc != 0)
1788                 RETURN(rc);
1789
1790         /* Sanity checks before big job. */
1791         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
1792         if (rc)
1793                 RETURN(rc);
1794
1795         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
1796                 GOTO(out_free, rc = -EINPROGRESS);
1797
1798         acl_buf.lb_buf = info->mti_xattr_buf;
1799         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
1800         rc = mdd_acl_init(env, mdd_pobj, attr, &acl_buf, &got_def_acl,
1801                           &reset_acl);
1802         if (rc < 0)
1803                 GOTO(out_free, rc);
1804
1805         mdd_object_make_hint(env, mdd_pobj, son, attr);
1806
1807         handle = mdd_trans_create(env, mdd);
1808         if (IS_ERR(handle))
1809                 GOTO(out_free, rc = PTR_ERR(handle));
1810
1811         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
1812                                 got_def_acl, handle, spec);
1813         if (rc)
1814                 GOTO(out_stop, rc);
1815
1816         rc = mdd_trans_start(env, mdd, handle);
1817         if (rc)
1818                 GOTO(out_stop, rc);
1819
1820         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1821         if (dlh == NULL)
1822                 GOTO(out_trans, rc = -ENOMEM);
1823
1824         mdd_write_lock(env, son, MOR_TGT_CHILD);
1825         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
1826         if (rc) {
1827                 mdd_write_unlock(env, son);
1828                 GOTO(cleanup, rc);
1829         }
1830
1831         created = 1;
1832
1833 #ifdef CONFIG_FS_POSIX_ACL
1834         if (got_def_acl) {
1835                 /* set default acl */
1836                 if (S_ISDIR(attr->la_mode)) {
1837                         LASSERTF(acl_buf.lb_len  == got_def_acl,
1838                                  "invalid acl_buf: %p:%d got_def %d\n",
1839                                  acl_buf.lb_buf, (int)acl_buf.lb_len,
1840                                  got_def_acl);
1841                         rc = mdo_xattr_set(env, son, &acl_buf,
1842                                            XATTR_NAME_ACL_DEFAULT, 0,
1843                                            handle, BYPASS_CAPA);
1844                         if (rc) {
1845                                 mdd_write_unlock(env, son);
1846                                 GOTO(cleanup, rc);
1847                         }
1848                 }
1849
1850                 /* set its own acl */
1851                 if (reset_acl) {
1852                         LASSERTF(acl_buf.lb_buf != NULL && acl_buf.lb_len != 0,
1853                                  "invalid acl_buf %p:%d\n", acl_buf.lb_buf,
1854                                  (int)acl_buf.lb_len);
1855                         rc = mdo_xattr_set(env, son, &acl_buf,
1856                                            XATTR_NAME_ACL_ACCESS,
1857                                            0, handle, BYPASS_CAPA);
1858                         if (rc) {
1859                                 mdd_write_unlock(env, son);
1860                                 GOTO(cleanup, rc);
1861                         }
1862                 }
1863         }
1864 #endif
1865
1866         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
1867                                    son, attr, handle, spec);
1868
1869         /*
1870          * in case of replay we just set LOVEA provided by the client
1871          * XXX: I think it would be interesting to try "old" way where
1872          *      MDT calls this xattr_set(LOV) in a different transaction.
1873          *      probably this way we code can be made better.
1874          */
1875         if (rc == 0 && (spec->no_create ||
1876                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
1877                 const struct lu_buf *buf;
1878
1879                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1880                                 spec->u.sp_ea.eadatalen);
1881                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
1882                                 BYPASS_CAPA);
1883         }
1884
1885         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1886                 rc = __mdd_orphan_add(env, son, handle);
1887
1888         mdd_write_unlock(env, son);
1889
1890         if (rc != 0)
1891                 /*
1892                  * Object has no links, so it will be destroyed when last
1893                  * reference is released. (XXX not now.)
1894                  */
1895                 GOTO(cleanup, rc);
1896
1897         initialized = 1;
1898
1899         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
1900                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1901                                         name, S_ISDIR(attr->la_mode), handle,
1902                                         mdd_object_capa(env, mdd_pobj));
1903
1904         if (rc != 0)
1905                 GOTO(cleanup, rc);
1906
1907         inserted = 1;
1908
1909         if (S_ISLNK(attr->la_mode)) {
1910                 struct lu_ucred  *uc = lu_ucred_assert(env);
1911                 struct dt_object *dt = mdd_object_child(son);
1912                 const char *target_name = spec->u.sp_symname;
1913                 int sym_len = strlen(target_name);
1914                 const struct lu_buf *buf;
1915                 loff_t pos = 0;
1916
1917                 buf = mdd_buf_get_const(env, target_name, sym_len);
1918                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1919                                                 mdd_object_capa(env, son),
1920                                                 uc->uc_cap &
1921                                                 CFS_CAP_SYS_RESOURCE_MASK);
1922
1923                 if (rc == sym_len)
1924                         rc = 0;
1925                 else
1926                         GOTO(cleanup, rc = -EFAULT);
1927         }
1928
1929         /* volatile file creation does not update parent directory times */
1930         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1931                 GOTO(cleanup, rc = 0);
1932
1933         /* update parent directory mtime/ctime */
1934         *la = *attr;
1935         la->la_valid = LA_CTIME | LA_MTIME;
1936         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1937         if (rc)
1938                 GOTO(cleanup, rc);
1939
1940         EXIT;
1941 cleanup:
1942         if (rc != 0 && created != 0) {
1943                 int rc2;
1944
1945                 if (inserted != 0) {
1946                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1947                                 rc2 = __mdd_orphan_del(env, son, handle);
1948                         else
1949                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1950                                                          S_ISDIR(attr->la_mode),
1951                                                          handle, BYPASS_CAPA);
1952                         if (rc2 != 0)
1953                                 goto out_stop;
1954                 }
1955
1956                 mdd_write_lock(env, son, MOR_TGT_CHILD);
1957                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
1958                         /* Drop the reference, no need to delete "."/"..",
1959                          * because the object to be destroied directly. */
1960                         rc2 = mdo_ref_del(env, son, handle);
1961                         if (rc2 != 0) {
1962                                 mdd_write_unlock(env, son);
1963                                 goto out_stop;
1964                         }
1965                 }
1966
1967                 rc2 = mdo_ref_del(env, son, handle);
1968                 if (rc2 != 0) {
1969                         mdd_write_unlock(env, son);
1970                         goto out_stop;
1971                 }
1972
1973                 mdo_destroy(env, son, handle);
1974                 mdd_write_unlock(env, son);
1975         }
1976
1977         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1978 out_trans:
1979         if (rc == 0 && fid_is_client_mdt_visible(mdo2fid(son)))
1980                 rc = mdd_changelog_ns_store(env, mdd,
1981                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
1982                         S_ISREG(attr->la_mode) ? CL_CREATE :
1983                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
1984                         0, son, mdd_pobj, lname, handle);
1985 out_stop:
1986         mdd_trans_stop(env, mdd, rc, handle);
1987 out_free:
1988         /* The child object shouldn't be cached anymore */
1989         if (rc)
1990                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1991                             &child->mo_lu.lo_header->loh_flags);
1992         return rc;
1993 }
1994
1995 /*
1996  * Get locks on parents in proper order
1997  * RETURN: < 0 - error, rename_order if successful
1998  */
1999 enum rename_order {
2000         MDD_RN_SAME,
2001         MDD_RN_SRCTGT,
2002         MDD_RN_TGTSRC
2003 };
2004
2005 static int mdd_rename_order(const struct lu_env *env,
2006                             struct mdd_device *mdd,
2007                             struct mdd_object *src_pobj,
2008                             struct mdd_object *tgt_pobj)
2009 {
2010         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2011         int rc;
2012         ENTRY;
2013
2014         if (src_pobj == tgt_pobj)
2015                 RETURN(MDD_RN_SAME);
2016
2017         /* compared the parent child relationship of src_p&tgt_p */
2018         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2019                 rc = MDD_RN_SRCTGT;
2020         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2021                 rc = MDD_RN_TGTSRC;
2022         } else {
2023                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2024                 if (rc == -EREMOTE)
2025                         rc = 0;
2026
2027                 if (rc == 1)
2028                         rc = MDD_RN_TGTSRC;
2029                 else if (rc == 0)
2030                         rc = MDD_RN_SRCTGT;
2031         }
2032
2033         RETURN(rc);
2034 }
2035
2036 /* has not mdd_write{read}_lock on any obj yet. */
2037 static int mdd_rename_sanity_check(const struct lu_env *env,
2038                                    struct mdd_object *src_pobj,
2039                                    struct mdd_object *tgt_pobj,
2040                                    struct mdd_object *sobj,
2041                                    struct mdd_object *tobj,
2042                                    struct lu_attr *so_attr,
2043                                    struct lu_attr *tg_attr)
2044 {
2045         int rc = 0;
2046         ENTRY;
2047
2048         /* XXX: when get here, sobj must NOT be NULL,
2049          * the other case has been processed in cml_rename
2050          * before mdd_rename and enable MDS_PERM_BYPASS. */
2051         LASSERT(sobj);
2052
2053         rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
2054         if (rc)
2055                 RETURN(rc);
2056
2057         /* XXX: when get here, "tobj == NULL" means tobj must
2058          * NOT exist (neither on remote MDS, such case has been
2059          * processed in cml_rename before mdd_rename and enable
2060          * MDS_PERM_BYPASS).
2061          * So check may_create, but not check may_unlink. */
2062         if (!tobj)
2063                 rc = mdd_may_create(env, tgt_pobj, NULL,
2064                                     (src_pobj != tgt_pobj), 0);
2065         else
2066                 rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
2067                                     (src_pobj != tgt_pobj), 1);
2068
2069         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2070             S_ISDIR(so_attr->la_mode))
2071                 rc = __mdd_may_link(env, tgt_pobj);
2072
2073         RETURN(rc);
2074 }
2075
2076 static int mdd_declare_rename(const struct lu_env *env,
2077                               struct mdd_device *mdd,
2078                               struct mdd_object *mdd_spobj,
2079                               struct mdd_object *mdd_tpobj,
2080                               struct mdd_object *mdd_sobj,
2081                               struct mdd_object *mdd_tobj,
2082                               const struct lu_name *tname,
2083                               const struct lu_name *sname,
2084                               struct md_attr *ma,
2085                               struct thandle *handle)
2086 {
2087         int rc;
2088
2089         LASSERT(mdd_spobj);
2090         LASSERT(mdd_tpobj);
2091         LASSERT(mdd_sobj);
2092
2093         /* name from source dir */
2094         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2095         if (rc)
2096                 return rc;
2097
2098         /* .. from source child */
2099         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2100                 /* source child can be directory,
2101                  * counted by source dir's nlink */
2102                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2103                 if (rc)
2104                         return rc;
2105
2106                 rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
2107                 if (rc)
2108                         return rc;
2109
2110                 rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
2111                                               dotdot, handle);
2112                 if (rc)
2113                         return rc;
2114
2115                 /* new target child can be directory,
2116                  * counted by target dir's nlink */
2117                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2118                 if (rc)
2119                         return rc;
2120
2121         }
2122
2123         rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
2124         if (rc)
2125                 return rc;
2126
2127         rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
2128         if (rc)
2129                 return rc;
2130         mdd_declare_links_add(env, mdd_sobj, handle);
2131         if (rc)
2132                 return rc;
2133
2134         rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
2135         if (rc)
2136                 return rc;
2137
2138         /* new name */
2139         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2140                         tname->ln_name, handle);
2141         if (rc)
2142                 return rc;
2143
2144         /* name from target dir (old name), we declare it unconditionally
2145          * as mdd_rename() calls delete unconditionally as well. so just
2146          * to balance declarations vs calls to change ... */
2147         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2148         if (rc)
2149                 return rc;
2150
2151         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2152                 /* delete target child in target parent directory */
2153                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2154                 if (rc)
2155                         return rc;
2156
2157                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2158                         /* target child can be directory,
2159                          * delete "." reference in target child directory */
2160                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2161                         if (rc)
2162                                 return rc;
2163
2164                         /* delete ".." reference in target parent directory */
2165                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2166                         if (rc)
2167                                 return rc;
2168                 }
2169
2170                 rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
2171                 if (rc)
2172                         return rc;
2173
2174                 mdd_declare_links_add(env, mdd_tobj, handle);
2175                 if (rc)
2176                         return rc;
2177
2178                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2179                 if (rc)
2180                         return rc;
2181         }
2182
2183         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2184         if (rc)
2185                 return rc;
2186
2187         return rc;
2188 }
2189
2190 /* src object can be remote that is why we use only fid and type of object */
2191 static int mdd_rename(const struct lu_env *env,
2192                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2193                       const struct lu_fid *lf, const struct lu_name *lsname,
2194                       struct md_object *tobj, const struct lu_name *ltname,
2195                       struct md_attr *ma)
2196 {
2197         const char *sname = lsname->ln_name;
2198         const char *tname = ltname->ln_name;
2199         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2200         struct lu_attr    *so_attr = &mdd_env_info(env)->mti_cattr;
2201         struct lu_attr    *tg_attr = &mdd_env_info(env)->mti_pattr;
2202         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2203         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2204         struct mdd_device *mdd = mdo2mdd(src_pobj);
2205         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2206         struct mdd_object *mdd_tobj = NULL;
2207         struct dynlock_handle *sdlh, *tdlh;
2208         struct thandle *handle;
2209         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2210         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2211         bool is_dir;
2212         bool tobj_ref = 0;
2213         bool tobj_locked = 0;
2214         unsigned cl_flags = 0;
2215         int rc, rc2;
2216         ENTRY;
2217
2218         if (tobj)
2219                 mdd_tobj = md2mdd_obj(tobj);
2220
2221         mdd_sobj = mdd_object_find(env, mdd, lf);
2222
2223         handle = mdd_trans_create(env, mdd);
2224         if (IS_ERR(handle))
2225                 GOTO(out_pending, rc = PTR_ERR(handle));
2226
2227         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2228                                 mdd_tobj, lsname, ltname, ma, handle);
2229         if (rc)
2230                 GOTO(stop, rc);
2231
2232         rc = mdd_trans_start(env, mdd, handle);
2233         if (rc)
2234                 GOTO(stop, rc);
2235
2236         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2237         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2238         if (rc < 0)
2239                 GOTO(cleanup_unlocked, rc);
2240
2241         /* Get locks in determined order */
2242         if (rc == MDD_RN_SAME) {
2243                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2244                                           sname, MOR_SRC_PARENT);
2245                 /* check hashes to determine do we need one lock or two */
2246                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2247                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2248                                 MOR_TGT_PARENT);
2249                 else
2250                         tdlh = sdlh;
2251         } else if (rc == MDD_RN_SRCTGT) {
2252                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2253                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2254         } else {
2255                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2256                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2257         }
2258         if (sdlh == NULL || tdlh == NULL)
2259                 GOTO(cleanup, rc = -ENOMEM);
2260
2261         rc = mdd_la_get(env, mdd_sobj, so_attr,
2262                         mdd_object_capa(env, mdd_sobj));
2263         if (rc)
2264                 GOTO(cleanup, rc);
2265
2266         if (mdd_tobj) {
2267                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2268                                 mdd_object_capa(env, mdd_tobj));
2269                 if (rc)
2270                         GOTO(cleanup, rc);
2271         }
2272
2273         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj,
2274                                      mdd_tobj, so_attr, tg_attr);
2275         if (rc)
2276                 GOTO(cleanup, rc);
2277
2278         is_dir = S_ISDIR(so_attr->la_mode);
2279
2280         /* Remove source name from source directory */
2281         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2282                                 mdd_object_capa(env, mdd_spobj));
2283         if (rc)
2284                 GOTO(cleanup, rc);
2285
2286         /* "mv dir1 dir2" needs "dir1/.." link update */
2287         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2288                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2289                                         mdd_object_capa(env, mdd_sobj));
2290                 if (rc)
2291                         GOTO(fixup_spobj2, rc);
2292
2293                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2294                                       handle, mdd_object_capa(env, mdd_sobj));
2295                 if (rc)
2296                         GOTO(fixup_spobj, rc);
2297         }
2298
2299         /* Remove target name from target directory
2300          * Here tobj can be remote one, so we do index_delete unconditionally
2301          * and -ENOENT is allowed.
2302          */
2303         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2304                                 mdd_object_capa(env, mdd_tpobj));
2305         if (rc != 0) {
2306                 if (mdd_tobj) {
2307                         /* tname might been renamed to something else */
2308                         GOTO(fixup_spobj, rc);
2309                 }
2310                 if (rc != -ENOENT)
2311                         GOTO(fixup_spobj, rc);
2312         }
2313
2314         /* Insert new fid with target name into target dir */
2315         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2316                                 mdd_object_capa(env, mdd_tpobj));
2317         if (rc)
2318                 GOTO(fixup_tpobj, rc);
2319
2320         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2321         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2322
2323         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2324         if (mdd_sobj) {
2325                 la->la_valid = LA_CTIME;
2326                 rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
2327                 if (rc)
2328                         GOTO(fixup_tpobj, rc);
2329         }
2330
2331         /* Remove old target object
2332          * For tobj is remote case cmm layer has processed
2333          * and set tobj to NULL then. So when tobj is NOT NULL,
2334          * it must be local one.
2335          */
2336         if (tobj && mdd_object_exists(mdd_tobj)) {
2337                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2338                 tobj_locked = 1;
2339                 if (mdd_is_dead_obj(mdd_tobj)) {
2340                         /* shld not be dead, something is wrong */
2341                         CERROR("tobj is dead, something is wrong\n");
2342                         rc = -EINVAL;
2343                         goto cleanup;
2344                 }
2345                 mdo_ref_del(env, mdd_tobj, handle);
2346
2347                 /* Remove dot reference. */
2348                 if (S_ISDIR(tg_attr->la_mode))
2349                         mdo_ref_del(env, mdd_tobj, handle);
2350                 tobj_ref = 1;
2351
2352                 /* fetch updated nlink */
2353                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2354                                 mdd_object_capa(env, mdd_tobj));
2355                 if (rc != 0) {
2356                         CERROR("%s: Failed to get nlink for tobj "
2357                                 DFID": rc = %d\n",
2358                                 mdd2obd_dev(mdd)->obd_name,
2359                                 PFID(tpobj_fid), rc);
2360                         GOTO(fixup_tpobj, rc);
2361                 }
2362
2363                 la->la_valid = LA_CTIME;
2364                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2365                 if (rc != 0) {
2366                         CERROR("%s: Failed to set ctime for tobj "
2367                                 DFID": rc = %d\n",
2368                                 mdd2obd_dev(mdd)->obd_name,
2369                                 PFID(tpobj_fid), rc);
2370                         GOTO(fixup_tpobj, rc);
2371                 }
2372
2373                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2374                 ma->ma_attr = *tg_attr;
2375                 ma->ma_valid |= MA_INODE;
2376                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2377                 if (rc != 0) {
2378                         CERROR("%s: Failed to unlink tobj "
2379                                 DFID": rc = %d\n",
2380                                 mdd2obd_dev(mdd)->obd_name,
2381                                 PFID(tpobj_fid), rc);
2382                         GOTO(fixup_tpobj, rc);
2383                 }
2384
2385                 /* fetch updated nlink */
2386                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2387                                 mdd_object_capa(env, mdd_tobj));
2388                 if (rc != 0) {
2389                         CERROR("%s: Failed to get nlink for tobj "
2390                                 DFID": rc = %d\n",
2391                                 mdd2obd_dev(mdd)->obd_name,
2392                                 PFID(tpobj_fid), rc);
2393                         GOTO(fixup_tpobj, rc);
2394                 }
2395                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2396                 ma->ma_attr = *tg_attr;
2397                 ma->ma_valid |= MA_INODE;
2398
2399                 if (so_attr->la_nlink == 0)
2400                         cl_flags |= CLF_RENAME_LAST;
2401         }
2402
2403         la->la_valid = LA_CTIME | LA_MTIME;
2404         rc = mdd_attr_check_set_internal(env, mdd_spobj, la, handle, 0);
2405         if (rc)
2406                 GOTO(fixup_tpobj, rc);
2407
2408         if (mdd_spobj != mdd_tpobj) {
2409                 la->la_valid = LA_CTIME | LA_MTIME;
2410                 rc = mdd_attr_check_set_internal(env, mdd_tpobj, la,
2411                                                  handle, 0);
2412         }
2413
2414         if (rc == 0 && mdd_sobj) {
2415                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2416                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2417                                       mdo2fid(mdd_tpobj), ltname, handle, 0, 0);
2418                 if (rc == -ENOENT)
2419                         /* Old files might not have EA entry */
2420                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2421                                       lsname, handle, 0);
2422                 mdd_write_unlock(env, mdd_sobj);
2423                 /* We don't fail the transaction if the link ea can't be
2424                    updated -- fid2path will use alternate lookup method. */
2425                 rc = 0;
2426         }
2427
2428         EXIT;
2429
2430 fixup_tpobj:
2431         if (rc) {
2432                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2433                                          BYPASS_CAPA);
2434                 if (rc2)
2435                         CWARN("tp obj fix error %d\n",rc2);
2436
2437                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2438                     !mdd_is_dead_obj(mdd_tobj)) {
2439                         if (tobj_ref) {
2440                                 mdo_ref_add(env, mdd_tobj, handle);
2441                                 if (is_dir)
2442                                         mdo_ref_add(env, mdd_tobj, handle);
2443                         }
2444
2445                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2446                                          mdo2fid(mdd_tobj), tname,
2447                                          is_dir, handle,
2448                                          BYPASS_CAPA);
2449
2450                         if (rc2)
2451                                 CWARN("tp obj fix error %d\n",rc2);
2452                 }
2453         }
2454
2455 fixup_spobj:
2456         if (rc && is_dir && mdd_sobj) {
2457                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2458                                               BYPASS_CAPA);
2459
2460                 if (rc2)
2461                         CWARN("sp obj dotdot delete error %d\n",rc2);
2462
2463
2464                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2465                                               dotdot, handle, BYPASS_CAPA);
2466                 if (rc2)
2467                         CWARN("sp obj dotdot insert error %d\n",rc2);
2468         }
2469
2470 fixup_spobj2:
2471         if (rc) {
2472                 rc2 = __mdd_index_insert(env, mdd_spobj,
2473                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2474                 if (rc2)
2475                         CWARN("sp obj fix error %d\n",rc2);
2476         }
2477 cleanup:
2478         if (tobj_locked)
2479                 mdd_write_unlock(env, mdd_tobj);
2480         if (likely(tdlh) && sdlh != tdlh)
2481                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2482         if (likely(sdlh))
2483                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2484 cleanup_unlocked:
2485         if (rc == 0)
2486                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2487                                                 mdd_tobj, tpobj_fid, lf,
2488                                                 spobj_fid, ltname, lsname,
2489                                                 handle);
2490
2491 stop:
2492         mdd_trans_stop(env, mdd, rc, handle);
2493 out_pending:
2494         mdd_object_put(env, mdd_sobj);
2495         return rc;
2496 }
2497
2498 int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata)
2499 {
2500         ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2501         if (ldata->ml_buf->lb_buf == NULL)
2502                 return -ENOMEM;
2503         ldata->ml_leh = ldata->ml_buf->lb_buf;
2504         ldata->ml_leh->leh_magic = LINK_EA_MAGIC;
2505         ldata->ml_leh->leh_len = sizeof(struct link_ea_header);
2506         ldata->ml_leh->leh_reccount = 0;
2507         return 0;
2508 }
2509
2510 /** Read the link EA into a temp buffer.
2511  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
2512  * A pointer to the buffer is stored in \a ldata::ml_buf.
2513  *
2514  * \retval 0 or error
2515  */
2516 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
2517                    struct mdd_link_data *ldata)
2518 {
2519         struct lustre_capa *capa;
2520         struct link_ea_header *leh;
2521         int rc;
2522
2523         /* First try a small buf */
2524         LASSERT(env != NULL);
2525         ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
2526         if (ldata->ml_buf->lb_buf == NULL)
2527                 return -ENOMEM;
2528
2529         if (!mdd_object_exists(mdd_obj))
2530                 return -ENODATA;
2531
2532         capa = mdd_object_capa(env, mdd_obj);
2533         rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
2534                            XATTR_NAME_LINK, capa);
2535         if (rc == -ERANGE) {
2536                 /* Buf was too small, figure out what we need. */
2537                 mdd_buf_put(ldata->ml_buf);
2538                 rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
2539                                    XATTR_NAME_LINK, capa);
2540                 if (rc < 0)
2541                         return rc;
2542                 ldata->ml_buf = mdd_buf_alloc(env, rc);
2543                 if (ldata->ml_buf->lb_buf == NULL)
2544                         return -ENOMEM;
2545                 rc = mdo_xattr_get(env, mdd_obj, &LU_BUF_NULL,
2546                                    XATTR_NAME_LINK, capa);
2547         }
2548         if (rc < 0)
2549                 return rc;
2550
2551         leh = ldata->ml_buf->lb_buf;
2552         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
2553                 leh->leh_magic = LINK_EA_MAGIC;
2554                 leh->leh_reccount = __swab32(leh->leh_reccount);
2555                 leh->leh_len = __swab64(leh->leh_len);
2556                 /* entries are swabbed by mdd_lee_unpack */
2557         }
2558         if (leh->leh_magic != LINK_EA_MAGIC)
2559                 return -EINVAL;
2560         if (leh->leh_reccount == 0)
2561                 return -ENODATA;
2562
2563         ldata->ml_leh = leh;
2564         return 0;
2565 }
2566
2567 /** Read the link EA into a temp buffer.
2568  * Uses the name_buf since it is generally large.
2569  * \retval IS_ERR err
2570  * \retval ptr to \a lu_buf (always \a mti_big_buf)
2571  */
2572 struct lu_buf *mdd_links_get(const struct lu_env *env,
2573                              struct mdd_object *mdd_obj)
2574 {
2575         struct mdd_link_data ldata = { 0 };
2576         int rc;
2577
2578         rc = mdd_links_read(env, mdd_obj, &ldata);
2579         return rc ? ERR_PTR(rc) : ldata.ml_buf;
2580 }
2581
2582 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
2583                     struct mdd_link_data *ldata, struct thandle *handle)
2584 {
2585         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf,
2586                                                      ldata->ml_leh->leh_len);
2587         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
2588                              mdd_object_capa(env, mdd_obj));
2589 }
2590
2591 /** Pack a link_ea_entry.
2592  * All elements are stored as chars to avoid alignment issues.
2593  * Numbers are always big-endian
2594  * \retval record length
2595  */
2596 static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
2597                         const struct lu_fid *pfid)
2598 {
2599         struct lu_fid   tmpfid;
2600         int             reclen;
2601
2602         fid_cpu_to_be(&tmpfid, pfid);
2603         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
2604                 tmpfid.f_ver = ~0;
2605         memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
2606         memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
2607         reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
2608
2609         lee->lee_reclen[0] = (reclen >> 8) & 0xff;
2610         lee->lee_reclen[1] = reclen & 0xff;
2611         return reclen;
2612 }
2613
2614 void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
2615                     struct lu_name *lname, struct lu_fid *pfid)
2616 {
2617         *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
2618         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
2619         fid_be_to_cpu(pfid, pfid);
2620         lname->ln_name = lee->lee_name;
2621         lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
2622 }
2623
2624 int mdd_declare_links_add(const struct lu_env *env,
2625                           struct mdd_object *mdd_obj,
2626                           struct thandle *handle)
2627 {
2628         int rc;
2629
2630         /* XXX: max size? */
2631         rc = mdo_declare_xattr_set(env, mdd_obj,
2632                              mdd_buf_get_const(env, NULL, 4096),
2633                              XATTR_NAME_LINK, 0, handle);
2634
2635         return rc;
2636 }
2637
2638 /** Add a record to the end of link ea buf */
2639 int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
2640                       const struct lu_name *lname, const struct lu_fid *pfid)
2641 {
2642         LASSERT(ldata->ml_leh != NULL);
2643
2644         if (lname == NULL || pfid == NULL)
2645                 return -EINVAL;
2646
2647         ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
2648         if (ldata->ml_leh->leh_len + ldata->ml_reclen >
2649             ldata->ml_buf->lb_len) {
2650                 if (mdd_buf_grow(env, ldata->ml_leh->leh_len +
2651                                  ldata->ml_reclen) < 0)
2652                         return -ENOMEM;
2653         }
2654
2655         ldata->ml_leh = ldata->ml_buf->lb_buf;
2656         ldata->ml_lee = ldata->ml_buf->lb_buf + ldata->ml_leh->leh_len;
2657         ldata->ml_reclen = mdd_lee_pack(ldata->ml_lee, lname, pfid);
2658         ldata->ml_leh->leh_len += ldata->ml_reclen;
2659         ldata->ml_leh->leh_reccount++;
2660         CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n",
2661                lname->ln_namelen, lname->ln_name);
2662         return 0;
2663 }
2664
2665 /** Del the current record from the link ea buf */
2666 void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
2667                        const struct lu_name *lname)
2668 {
2669         LASSERT(ldata->ml_leh != NULL);
2670
2671         ldata->ml_leh->leh_reccount--;
2672         ldata->ml_leh->leh_len -= ldata->ml_reclen;
2673         memmove(ldata->ml_lee, (char *)ldata->ml_lee + ldata->ml_reclen,
2674                 (char *)ldata->ml_leh + ldata->ml_leh->leh_len -
2675                 (char *)ldata->ml_lee);
2676         CDEBUG(D_INODE, "Old link_ea name '%.*s' is removed\n",
2677                lname->ln_namelen, lname->ln_name);
2678
2679 }
2680
2681 /**
2682  * Check if such a link exists in linkEA.
2683  *
2684  * \param mdd_obj object being handled
2685  * \param pfid parent fid the link to be found for
2686  * \param lname name in the parent's directory entry pointing to this object
2687  * \param ldata link data the search to be done on
2688  *
2689  * \retval   0 success
2690  * \retval -ENOENT link does not exist
2691  * \retval -ve on error
2692  */
2693 int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
2694                    struct mdd_link_data *ldata, const struct lu_name *lname,
2695                    const struct lu_fid  *pfid)
2696 {
2697         struct lu_name *tmpname = &mdd_env_info(env)->mti_name2;
2698         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
2699         int count;
2700
2701         LASSERT(ldata->ml_leh != NULL);
2702
2703         /* link #0 */
2704         ldata->ml_lee = (struct link_ea_entry *)(ldata->ml_leh + 1);
2705
2706         for (count = 0; count < ldata->ml_leh->leh_reccount; count++) {
2707                 mdd_lee_unpack(ldata->ml_lee, &ldata->ml_reclen,
2708                                tmpname, tmpfid);
2709                 if (tmpname->ln_namelen == lname->ln_namelen &&
2710                     lu_fid_eq(tmpfid, pfid) &&
2711                     (strncmp(tmpname->ln_name, lname->ln_name,
2712                              tmpname->ln_namelen) == 0))
2713                         break;
2714                 ldata->ml_lee = (struct link_ea_entry *)((char *)ldata->ml_lee +
2715                                                          ldata->ml_reclen);
2716         }
2717
2718         if (count == ldata->ml_leh->leh_reccount) {
2719                 CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
2720                        lname->ln_namelen, lname->ln_name);
2721                 return -ENOENT;
2722         }
2723         return 0;
2724 }
2725
2726 static int __mdd_links_add(const struct lu_env *env,
2727                            struct mdd_object *mdd_obj,
2728                            struct mdd_link_data *ldata,
2729                            const struct lu_name *lname,
2730                            const struct lu_fid *pfid,
2731                            int first, int check)
2732 {
2733         int rc;
2734
2735         if (ldata->ml_leh == NULL) {
2736                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
2737                 if (rc) {
2738                         if (rc != -ENODATA)
2739                                 return rc;
2740                         rc = mdd_links_new(env, ldata);
2741                         if (rc)
2742                                 return rc;
2743                 }
2744         }
2745
2746         if (check) {
2747                 rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
2748                 if (rc && rc != -ENOENT)
2749                         return rc;
2750                 if (rc == 0)
2751                         return -EEXIST;
2752         }
2753
2754         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
2755                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
2756
2757                 *tfid = *pfid;
2758                 tfid->f_ver = ~0;
2759                 mdd_links_add_buf(env, ldata, lname, tfid);
2760         }
2761
2762         return mdd_links_add_buf(env, ldata, lname, pfid);
2763 }
2764
2765 static int __mdd_links_del(const struct lu_env *env,
2766                            struct mdd_object *mdd_obj,
2767                            struct mdd_link_data *ldata,
2768                            const struct lu_name *lname,
2769                            const struct lu_fid *pfid)
2770 {
2771         int rc;
2772
2773         if (ldata->ml_leh == NULL) {
2774                 rc = mdd_links_read(env, mdd_obj, ldata);
2775                 if (rc)
2776                         return rc;
2777         }
2778
2779         rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
2780         if (rc)
2781                 return rc;
2782
2783         mdd_links_del_buf(env, ldata, lname);
2784         return 0;
2785 }
2786
2787 int mdd_links_rename(const struct lu_env *env,
2788                      struct mdd_object *mdd_obj,
2789                      const struct lu_fid *oldpfid,
2790                      const struct lu_name *oldlname,
2791                      const struct lu_fid *newpfid,
2792                      const struct lu_name *newlname,
2793                      struct thandle *handle,
2794                      int first, int check)
2795 {
2796         struct mdd_link_data ldata = { 0 };
2797         int updated = 0;
2798         int rc2 = 0;
2799         int rc = 0;
2800         ENTRY;
2801
2802         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
2803                 return 0;
2804
2805         LASSERT(oldpfid != NULL || newpfid != NULL);
2806
2807         if (mdd_obj->mod_flags & DEAD_OBJ)
2808                 /* No more links, don't bother */
2809                 RETURN(0);
2810
2811         if (oldpfid != NULL) {
2812                 rc = __mdd_links_del(env, mdd_obj, &ldata,
2813                                      oldlname, oldpfid);
2814                 if (rc) {
2815                         if ((check == 0) ||
2816                             (rc != -ENODATA && rc != -ENOENT))
2817                                 GOTO(out, rc);
2818                         /* No changes done. */
2819                         rc = 0;
2820                 } else {
2821                         updated = 1;
2822                 }
2823         }
2824
2825         /* If renaming, add the new record */
2826         if (newpfid != NULL) {
2827                 /* even if the add fails, we still delete the out-of-date
2828                  * old link */
2829                 rc2 = __mdd_links_add(env, mdd_obj, &ldata,
2830                                       newlname, newpfid, first, check);
2831                 if (rc2 == -EEXIST)
2832                         rc2 = 0;
2833                 else if (rc2 == 0)
2834                         updated = 1;
2835         }
2836
2837         if (updated)
2838                 rc = mdd_links_write(env, mdd_obj, &ldata, handle);
2839         EXIT;
2840 out:
2841         if (rc == 0)
2842                 rc = rc2;
2843         if (rc) {
2844                 int error = 1;
2845                 if (rc == -EOVERFLOW || rc == - ENOENT)
2846                         error = 0;
2847                 if (oldpfid == NULL)
2848                         CDEBUG(error ? D_ERROR : D_OTHER,
2849                                "link_ea add '%.*s' failed %d "DFID"\n",
2850                                newlname->ln_namelen, newlname->ln_name,
2851                                rc, PFID(mdd_object_fid(mdd_obj)));
2852                 else if (newpfid == NULL)
2853                         CDEBUG(error ? D_ERROR : D_OTHER,
2854                                "link_ea del '%.*s' failed %d "DFID"\n",
2855                                oldlname->ln_namelen, oldlname->ln_name,
2856                                rc, PFID(mdd_object_fid(mdd_obj)));
2857                 else
2858                         CDEBUG(error ? D_ERROR : D_OTHER,
2859                                "link_ea rename '%.*s'->'%.*s' failed %d "
2860                                DFID"\n",
2861                                oldlname->ln_namelen, oldlname->ln_name,
2862                                newlname->ln_namelen, newlname->ln_name,
2863                                rc, PFID(mdd_object_fid(mdd_obj)));
2864         }
2865
2866         if (ldata.ml_buf && ldata.ml_buf->lb_len > OBD_ALLOC_BIG)
2867                 /* if we vmalloced a large buffer drop it */
2868                 mdd_buf_put(ldata.ml_buf);
2869
2870         return rc;
2871 }
2872
2873 static inline int mdd_links_add(const struct lu_env *env,
2874                                 struct mdd_object *mdd_obj,
2875                                 const struct lu_fid *pfid,
2876                                 const struct lu_name *lname,
2877                                 struct thandle *handle, int first)
2878 {
2879         return mdd_links_rename(env, mdd_obj, NULL, NULL,
2880                                 pfid, lname, handle, first, 0);
2881 }
2882
2883 static inline int mdd_links_del(const struct lu_env *env,
2884                                 struct mdd_object *mdd_obj,
2885                                 const struct lu_fid *pfid,
2886                                 const struct lu_name *lname,
2887                                 struct thandle *handle)
2888 {
2889         return mdd_links_rename(env, mdd_obj, pfid, lname,
2890                                 NULL, NULL, handle, 0, 0);
2891 }
2892
2893 const struct md_dir_operations mdd_dir_ops = {
2894         .mdo_is_subdir     = mdd_is_subdir,
2895         .mdo_lookup        = mdd_lookup,
2896         .mdo_create        = mdd_create,
2897         .mdo_rename        = mdd_rename,
2898         .mdo_link          = mdd_link,
2899         .mdo_unlink        = mdd_unlink,
2900         .mdo_create_data   = mdd_create_data,
2901 };