Whamcloud - gitweb
46c11607e5327695b11d747c1caefc5351ec8afd
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 static int __mdd_lookup(const struct lu_env *, struct md_object *,
61                         const struct lu_name *, struct lu_fid*, int);
62
63 static int
64 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
65                     const struct lu_name *lname, struct lu_fid* fid, int mask)
66 {
67         const char *name = lname->ln_name;
68         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
69         struct dynlock_handle *dlh;
70         int rc;
71
72         dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
73         if (unlikely(dlh == NULL))
74                 return -ENOMEM;
75         rc = __mdd_lookup(env, pobj, lname, fid, mask);
76         mdd_pdo_read_unlock(env, mdd_obj, dlh);
77
78         return rc;
79 }
80
81 int mdd_lookup(const struct lu_env *env,
82                struct md_object *pobj, const struct lu_name *lname,
83                struct lu_fid* fid, struct md_op_spec *spec)
84 {
85         int rc;
86         ENTRY;
87         rc = __mdd_lookup_locked(env, pobj, lname, fid, MAY_EXEC);
88         RETURN(rc);
89 }
90
91 int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
92                    struct lu_fid *fid)
93 {
94         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
95 }
96
97 /*
98  * For root fid use special function, which does not compare version component
99  * of fid. Version component is different for root fids on all MDTs.
100  */
101 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
102 {
103         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
104                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
105 }
106
107 /*
108  * return 1: if lf is the fid of the ancestor of p1;
109  * return 0: if not;
110  *
111  * return -EREMOTE: if remote object is found, in this
112  * case fid of remote object is saved to @pf;
113  *
114  * otherwise: values < 0, errors.
115  */
116 static int mdd_is_parent(const struct lu_env *env,
117                          struct mdd_device *mdd,
118                          struct mdd_object *p1,
119                          const struct lu_fid *lf,
120                          struct lu_fid *pf)
121 {
122         struct mdd_object *parent = NULL;
123         struct lu_fid *pfid;
124         int rc;
125         ENTRY;
126
127         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
128         pfid = &mdd_env_info(env)->mti_fid;
129
130         /* Check for root first. */
131         if (mdd_is_root(mdd, mdo2fid(p1)))
132                 RETURN(0);
133
134         for(;;) {
135                 /* this is done recursively, bypass capa for each obj */
136                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
137                 rc = mdd_parent_fid(env, p1, pfid);
138                 if (rc)
139                         GOTO(out, rc);
140                 if (mdd_is_root(mdd, pfid))
141                         GOTO(out, rc = 0);
142                 if (lu_fid_eq(pfid, lf))
143                         GOTO(out, rc = 1);
144                 if (parent)
145                         mdd_object_put(env, parent);
146
147                 parent = mdd_object_find(env, mdd, pfid);
148                 if (IS_ERR(parent)) {
149                         GOTO(out, rc = PTR_ERR(parent));
150                 } else if (mdd_object_remote(parent)) {
151                         /*FIXME: Because of the restriction of rename in Phase I.
152                          * If the parent is remote, we just assumed lf is not the
153                          * parent of P1 for now */
154                         GOTO(out, rc = 0);
155                 }
156                 p1 = parent;
157         }
158         EXIT;
159 out:
160         if (parent && !IS_ERR(parent))
161                 mdd_object_put(env, parent);
162         return rc;
163 }
164
165 /*
166  * No permission check is needed.
167  *
168  * returns 1: if fid is ancestor of @mo;
169  * returns 0: if fid is not a ancestor of @mo;
170  *
171  * returns EREMOTE if remote object is found, fid of remote object is saved to
172  * @fid;
173  *
174  * returns < 0: if error
175  */
176 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
177                   const struct lu_fid *fid, struct lu_fid *sfid)
178 {
179         struct mdd_device *mdd = mdo2mdd(mo);
180         int rc;
181         ENTRY;
182
183         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
184                 RETURN(0);
185
186         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
187         if (rc == 0) {
188                 /* found root */
189                 fid_zero(sfid);
190         } else if (rc == 1) {
191                 /* found @fid is parent */
192                 *sfid = *fid;
193                 rc = 0;
194         }
195         RETURN(rc);
196 }
197
198 /*
199  * Check that @dir contains no entries except (possibly) dot and dotdot.
200  *
201  * Returns:
202  *
203  *             0        empty
204  *      -ENOTDIR        not a directory object
205  *    -ENOTEMPTY        not empty
206  *           -ve        other error
207  *
208  */
209 static int mdd_dir_is_empty(const struct lu_env *env,
210                             struct mdd_object *dir)
211 {
212         struct dt_it     *it;
213         struct dt_object *obj;
214         const struct dt_it_ops *iops;
215         int result;
216         ENTRY;
217
218         obj = mdd_object_child(dir);
219         if (!dt_try_as_dir(env, obj))
220                 RETURN(-ENOTDIR);
221
222         iops = &obj->do_index_ops->dio_it;
223         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
224         if (!IS_ERR(it)) {
225                 result = iops->get(env, it, (const void *)"");
226                 if (result > 0) {
227                         int i;
228                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
229                                 result = iops->next(env, it);
230                         if (result == 0)
231                                 result = -ENOTEMPTY;
232                         else if (result == +1)
233                                 result = 0;
234                 } else if (result == 0)
235                         /*
236                          * Huh? Index contains no zero key?
237                          */
238                         result = -EIO;
239
240                 iops->put(env, it);
241                 iops->fini(env, it);
242         } else
243                 result = PTR_ERR(it);
244         RETURN(result);
245 }
246
247 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
248 {
249         struct mdd_device *m = mdd_obj2mdd_dev(obj);
250         struct lu_attr *la = &mdd_env_info(env)->mti_la;
251         int rc;
252         ENTRY;
253
254         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
255         if (rc)
256                 RETURN(rc);
257
258         /*
259          * Subdir count limitation can be broken through.
260          */
261         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
262             !S_ISDIR(la->la_mode))
263                 RETURN(-EMLINK);
264         else
265                 RETURN(0);
266 }
267
268 /*
269  * Check whether it may create the cobj under the pobj.
270  * cobj maybe NULL
271  */
272 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
273                    struct mdd_object *cobj, int check_perm, int check_nlink)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (cobj && mdd_object_exists(cobj))
279                 RETURN(-EEXIST);
280
281         if (mdd_is_dead_obj(pobj))
282                 RETURN(-ENOENT);
283
284         if (check_perm)
285                 rc = mdd_permission_internal_locked(env, pobj, NULL,
286                                                     MAY_WRITE | MAY_EXEC,
287                                                     MOR_TGT_PARENT);
288         if (!rc && check_nlink)
289                 rc = __mdd_may_link(env, pobj);
290
291         RETURN(rc);
292 }
293
294 /*
295  * Check whether can unlink from the pobj in the case of "cobj == NULL".
296  */
297 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
298                    const struct lu_attr *attr)
299 {
300         int rc;
301         ENTRY;
302
303         if (mdd_is_dead_obj(pobj))
304                 RETURN(-ENOENT);
305
306         if ((attr->la_valid & LA_FLAGS) &&
307             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
308                 RETURN(-EPERM);
309
310         rc = mdd_permission_internal_locked(env, pobj, NULL,
311                                             MAY_WRITE | MAY_EXEC,
312                                             MOR_TGT_PARENT);
313         if (rc)
314                 RETURN(rc);
315
316         if (mdd_is_append(pobj))
317                 RETURN(-EPERM);
318
319         RETURN(rc);
320 }
321
322 /*
323  * pobj == NULL is remote ops case, under such case, pobj's
324  * VTX feature has been checked already, no need check again.
325  */
326 static inline int mdd_is_sticky(const struct lu_env *env,
327                                 struct mdd_object *pobj,
328                                 struct mdd_object *cobj)
329 {
330         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
331         struct lu_ucred *uc = lu_ucred_assert(env);
332         int rc;
333
334         if (pobj) {
335                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
336                 if (rc)
337                         return rc;
338
339                 if (!(tmp_la->la_mode & S_ISVTX) ||
340                     (tmp_la->la_uid == uc->uc_fsuid))
341                         return 0;
342         }
343
344         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
345         if (rc)
346                 return rc;
347
348         if (tmp_la->la_uid == uc->uc_fsuid)
349                 return 0;
350
351         return !md_capable(uc, CFS_CAP_FOWNER);
352 }
353
354 static int mdd_may_delete_entry(const struct lu_env *env,
355                                 struct mdd_object *pobj, int check_perm)
356 {
357         ENTRY;
358
359         LASSERT(pobj != NULL);
360         if (!mdd_object_exists(pobj))
361                 RETURN(-ENOENT);
362
363         if (mdd_is_dead_obj(pobj))
364                 RETURN(-ENOENT);
365
366         if (check_perm) {
367                 int rc;
368                 rc = mdd_permission_internal_locked(env, pobj, NULL,
369                                             MAY_WRITE | MAY_EXEC,
370                                             MOR_TGT_PARENT);
371                 if (rc)
372                         RETURN(rc);
373         }
374
375         if (mdd_is_append(pobj))
376                 RETURN(-EPERM);
377
378         RETURN(0);
379 }
380
381 /*
382  * Check whether it may delete the cobj from the pobj.
383  * pobj maybe NULL
384  */
385 int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
386                    struct mdd_object *cobj, struct lu_attr *cattr,
387                    struct lu_attr *src_attr, int check_perm, int check_empty)
388 {
389         int rc = 0;
390         ENTRY;
391
392         if (pobj) {
393                 rc = mdd_may_delete_entry(env, pobj, check_perm);
394                 if (rc != 0)
395                         RETURN(rc);
396         }
397
398         if (cobj == NULL)
399                 RETURN(0);
400
401         if (!mdd_object_exists(cobj))
402                 RETURN(-ENOENT);
403
404         if (mdd_is_dead_obj(cobj))
405                 RETURN(-ESTALE);
406
407
408         if (mdd_is_sticky(env, pobj, cobj))
409                 RETURN(-EPERM);
410
411         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
412                 RETURN(-EPERM);
413
414         if ((cattr->la_valid & LA_FLAGS) &&
415             (cattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
416                 RETURN(-EPERM);
417
418         /* additional check the rename case */
419         if (src_attr) {
420                 if (S_ISDIR(src_attr->la_mode)) {
421                         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
422
423                         if (!S_ISDIR(cattr->la_mode))
424                                 RETURN(-ENOTDIR);
425
426                         if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
427                                 RETURN(-EBUSY);
428                 } else if (S_ISDIR(cattr->la_mode))
429                         RETURN(-EISDIR);
430         }
431
432         if (S_ISDIR(cattr->la_mode) && check_empty)
433                 rc = mdd_dir_is_empty(env, cobj);
434
435         RETURN(rc);
436 }
437
438 /*
439  * tgt maybe NULL
440  * has mdd_write_lock on src already, but not on tgt yet
441  */
442 int mdd_link_sanity_check(const struct lu_env *env,
443                           struct mdd_object *tgt_obj,
444                           const struct lu_name *lname,
445                           struct mdd_object *src_obj)
446 {
447         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
448         int rc = 0;
449         ENTRY;
450
451         if (!mdd_object_exists(src_obj))
452                 RETURN(-ENOENT);
453
454         if (mdd_is_dead_obj(src_obj))
455                 RETURN(-ESTALE);
456
457         /* Local ops, no lookup before link, check filename length here. */
458         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
459                 RETURN(-ENAMETOOLONG);
460
461         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
462                 RETURN(-EPERM);
463
464         if (S_ISDIR(mdd_object_type(src_obj)))
465                 RETURN(-EPERM);
466
467         LASSERT(src_obj != tgt_obj);
468         if (tgt_obj) {
469                 rc = mdd_may_create(env, tgt_obj, NULL, 1, 0);
470                 if (rc)
471                         RETURN(rc);
472         }
473
474         rc = __mdd_may_link(env, src_obj);
475
476         RETURN(rc);
477 }
478
479 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
480                                    const char *name, struct thandle *handle,
481                                    struct lustre_capa *capa)
482 {
483         struct dt_object *next = mdd_object_child(pobj);
484         int               rc;
485         ENTRY;
486
487         if (dt_try_as_dir(env, next)) {
488                 rc = next->do_index_ops->dio_delete(env, next,
489                                                     (struct dt_key *)name,
490                                                     handle, capa);
491         } else
492                 rc = -ENOTDIR;
493
494         RETURN(rc);
495 }
496
497 static int __mdd_index_insert_only(const struct lu_env *env,
498                                    struct mdd_object *pobj,
499                                    const struct lu_fid *lf, const char *name,
500                                    struct thandle *handle,
501                                    struct lustre_capa *capa)
502 {
503         struct dt_object *next = mdd_object_child(pobj);
504         int               rc;
505         ENTRY;
506
507         if (dt_try_as_dir(env, next)) {
508                 struct lu_ucred  *uc = lu_ucred_check(env);
509                 int ignore_quota;
510
511                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
512                 rc = next->do_index_ops->dio_insert(env, next,
513                                                     (struct dt_rec*)lf,
514                                                     (const struct dt_key *)name,
515                                                     handle, capa, ignore_quota);
516         } else {
517                 rc = -ENOTDIR;
518         }
519         RETURN(rc);
520 }
521
522 /* insert named index, add reference if isdir */
523 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
524                               const struct lu_fid *lf, const char *name, int is_dir,
525                               struct thandle *handle, struct lustre_capa *capa)
526 {
527         int               rc;
528         ENTRY;
529
530         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
531         if (rc == 0 && is_dir) {
532                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
533                 mdo_ref_add(env, pobj, handle);
534                 mdd_write_unlock(env, pobj);
535         }
536         RETURN(rc);
537 }
538
539 /* delete named index, drop reference if isdir */
540 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
541                               const char *name, int is_dir, struct thandle *handle,
542                               struct lustre_capa *capa)
543 {
544         int               rc;
545         ENTRY;
546
547         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
548         if (rc == 0 && is_dir) {
549                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
550                 mdo_ref_del(env, pobj, handle);
551                 mdd_write_unlock(env, pobj);
552         }
553
554         RETURN(rc);
555 }
556
557 int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
558                             int reclen, struct thandle *handle)
559 {
560         int rc;
561
562         /* XXX: this is a temporary solution to declare llog changes
563          *      will be fixed in 2.3 with new llog implementation */
564
565         LASSERT(mdd->mdd_capa);
566
567         /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
568          *      have to set the parameter 'size' as INT_MAX or 0 to inform
569          *      OSD that this record write is for a llog write or catalog
570          *      header update, and osd declare function will reserve less
571          *      credits for optimization purpose.
572          *
573          *      Reserve 6 blocks for a llog write, since the llog file is
574          *      usually small, reserve 2 blocks for catalog header update,
575          *      because we know for sure that catalog header is already
576          *      allocated.
577          *
578          *      This hack should be removed in 2.3.
579          */
580
581         /* record itself */
582         rc = dt_declare_record_write(env, mdd->mdd_capa,
583                                      DECLARE_LLOG_WRITE, 0, handle);
584         if (rc)
585                 return rc;
586
587         /* header will be updated as well */
588         rc = dt_declare_record_write(env, mdd->mdd_capa,
589                                      DECLARE_LLOG_WRITE, 0, handle);
590         if (rc)
591                 return rc;
592
593         /* also we should be able to create new plain log */
594         rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
595         if (rc)
596                 return rc;
597
598         /* new record referencing new plain llog */
599         rc = dt_declare_record_write(env, mdd->mdd_capa,
600                                      DECLARE_LLOG_WRITE, 0, handle);
601         if (rc)
602                 return rc;
603
604         /* catalog's header will be updated as well */
605         rc = dt_declare_record_write(env, mdd->mdd_capa,
606                                      DECLARE_LLOG_REWRITE, 0, handle);
607
608         return rc;
609 }
610
611 int mdd_declare_changelog_store(const struct lu_env *env,
612                                 struct mdd_device *mdd,
613                                 const struct lu_name *fname,
614                                 struct thandle *handle)
615 {
616         struct obd_device               *obd = mdd2obd_dev(mdd);
617         struct llog_ctxt                *ctxt;
618         struct llog_changelog_rec       *rec;
619         struct lu_buf                   *buf;
620         int                              reclen;
621         int                              rc;
622
623         /* Not recording */
624         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
625                 return 0;
626
627         reclen = llog_data_len(sizeof(*rec) +
628                                (fname != NULL ? fname->ln_namelen : 0));
629         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
630         if (buf->lb_buf == NULL)
631                 return -ENOMEM;
632
633         rec = buf->lb_buf;
634         rec->cr_hdr.lrh_len = reclen;
635         rec->cr_hdr.lrh_type = CHANGELOG_REC;
636
637         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
638         if (ctxt == NULL)
639                 return -ENXIO;
640
641         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
642         llog_ctxt_put(ctxt);
643
644         return rc;
645 }
646
647 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
648                                            struct mdd_device *mdd,
649                                            const struct lu_name *tname,
650                                            const struct lu_name *sname,
651                                            struct thandle *handle)
652 {
653         struct obd_device               *obd = mdd2obd_dev(mdd);
654         struct llog_ctxt                *ctxt;
655         struct llog_changelog_ext_rec   *rec;
656         struct lu_buf                   *buf;
657         int                              reclen;
658         int                              rc;
659
660         /* Not recording */
661         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
662                 return 0;
663
664         reclen = llog_data_len(sizeof(*rec) +
665                                (tname != NULL ? tname->ln_namelen : 0) +
666                                (sname != NULL ? 1 + sname->ln_namelen : 0));
667         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
668         if (buf->lb_buf == NULL)
669                 return -ENOMEM;
670
671         rec = buf->lb_buf;
672         rec->cr_hdr.lrh_len = reclen;
673         rec->cr_hdr.lrh_type = CHANGELOG_REC;
674
675         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
676         if (ctxt == NULL)
677                 return -ENXIO;
678
679         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
680         llog_ctxt_put(ctxt);
681
682         return rc;
683 }
684
685 /** Add a changelog entry \a rec to the changelog llog
686  * \param mdd
687  * \param rec
688  * \param handle - currently ignored since llogs start their own transaction;
689  *                 this will hopefully be fixed in llog rewrite
690  * \retval 0 ok
691  */
692 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
693                         struct llog_changelog_rec *rec, struct thandle *th)
694 {
695         struct obd_device       *obd = mdd2obd_dev(mdd);
696         struct llog_ctxt        *ctxt;
697         int                      rc;
698
699         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
700         rec->cr_hdr.lrh_type = CHANGELOG_REC;
701         rec->cr.cr_time = cl_time();
702
703         spin_lock(&mdd->mdd_cl.mc_lock);
704         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
705          * but as long as the MDD transactions are ordered correctly for e.g.
706          * rename conflicts, I don't think this should matter. */
707         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
708         spin_unlock(&mdd->mdd_cl.mc_lock);
709
710         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
711         if (ctxt == NULL)
712                 return -ENXIO;
713
714         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
715         llog_ctxt_put(ctxt);
716         if (rc > 0)
717                 rc = 0;
718         return rc;
719 }
720
721 /** Add a changelog_ext entry \a rec to the changelog llog
722  * \param mdd
723  * \param rec
724  * \param handle - currently ignored since llogs start their own transaction;
725  *              this will hopefully be fixed in llog rewrite
726  * \retval 0 ok
727  */
728 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
729                             struct llog_changelog_ext_rec *rec,
730                             struct thandle *th)
731 {
732         struct obd_device       *obd = mdd2obd_dev(mdd);
733         struct llog_ctxt        *ctxt;
734         int                      rc;
735
736         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
737         /* llog_lvfs_write_rec sets the llog tail len */
738         rec->cr_hdr.lrh_type = CHANGELOG_REC;
739         rec->cr.cr_time = cl_time();
740
741         spin_lock(&mdd->mdd_cl.mc_lock);
742         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
743          * but as long as the MDD transactions are ordered correctly for e.g.
744          * rename conflicts, I don't think this should matter. */
745         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
746         spin_unlock(&mdd->mdd_cl.mc_lock);
747
748         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
749         if (ctxt == NULL)
750                 return -ENXIO;
751
752         /* nested journal transaction */
753         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
754         llog_ctxt_put(ctxt);
755         if (rc > 0)
756                 rc = 0;
757
758         return rc;
759 }
760
761 /** Store a namespace change changelog record
762  * If this fails, we must fail the whole transaction; we don't
763  * want the change to commit without the log entry.
764  * \param target - mdd_object of change
765  * \param parent - parent dir/object
766  * \param tname - target name string
767  * \param handle - transacion handle
768  */
769 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
770                            enum changelog_rec_type type, unsigned flags,
771                            struct mdd_object *target, struct mdd_object *parent,
772                            const struct lu_name *tname, struct thandle *handle)
773 {
774         struct llog_changelog_rec *rec;
775         struct lu_buf *buf;
776         int reclen;
777         int rc;
778         ENTRY;
779
780         /* Not recording */
781         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
782                 RETURN(0);
783         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
784                 RETURN(0);
785
786         LASSERT(target != NULL);
787         LASSERT(parent != NULL);
788         LASSERT(tname != NULL);
789         LASSERT(handle != NULL);
790
791         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
792         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
793         if (buf->lb_buf == NULL)
794                 RETURN(-ENOMEM);
795         rec = buf->lb_buf;
796
797         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
798         rec->cr.cr_type = (__u32)type;
799         rec->cr.cr_tfid = *mdo2fid(target);
800         rec->cr.cr_pfid = *mdo2fid(parent);
801         rec->cr.cr_namelen = tname->ln_namelen;
802         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
803
804         target->mod_cltime = cfs_time_current_64();
805
806         rc = mdd_changelog_store(env, mdd, rec, handle);
807         if (rc < 0) {
808                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
809                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
810                         PFID(&rec->cr.cr_pfid));
811                 RETURN(-EFAULT);
812         }
813
814         RETURN(0);
815 }
816
817
818 /** Store a namespace change changelog record
819  * If this fails, we must fail the whole transaction; we don't
820  * want the change to commit without the log entry.
821  * \param target - mdd_object of change
822  * \param tpfid - target parent dir/object fid
823  * \param sfid - source object fid
824  * \param spfid - source parent fid
825  * \param tname - target name string
826  * \param sname - source name string
827  * \param handle - transacion handle
828  */
829 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
830                                       struct mdd_device    *mdd,
831                                       enum changelog_rec_type type,
832                                       unsigned flags,
833                                       struct mdd_object    *target,
834                                       const struct lu_fid  *tpfid,
835                                       const struct lu_fid  *sfid,
836                                       const struct lu_fid  *spfid,
837                                       const struct lu_name *tname,
838                                       const struct lu_name *sname,
839                                       struct thandle *handle)
840 {
841         struct llog_changelog_ext_rec *rec;
842         struct lu_buf *buf;
843         int reclen;
844         int rc;
845         ENTRY;
846
847         /* Not recording */
848         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
849                 RETURN(0);
850         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
851                 RETURN(0);
852
853         LASSERT(sfid != NULL);
854         LASSERT(tpfid != NULL);
855         LASSERT(tname != NULL);
856         LASSERT(handle != NULL);
857
858         reclen = llog_data_len(sizeof(*rec) +
859                                sname != NULL ? 1 + sname->ln_namelen : 0);
860         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
861         if (buf->lb_buf == NULL)
862                 RETURN(-ENOMEM);
863         rec = buf->lb_buf;
864
865         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
866         rec->cr.cr_type = (__u32)type;
867         rec->cr.cr_pfid = *tpfid;
868         rec->cr.cr_sfid = *sfid;
869         rec->cr.cr_spfid = *spfid;
870         rec->cr.cr_namelen = tname->ln_namelen;
871         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
872         if (sname) {
873                 rec->cr.cr_name[tname->ln_namelen] = '\0';
874                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
875                         sname->ln_namelen);
876                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
877         }
878
879         if (likely(target != NULL)) {
880                 rec->cr.cr_tfid = *mdo2fid(target);
881                 target->mod_cltime = cfs_time_current_64();
882         } else {
883                 fid_zero(&rec->cr.cr_tfid);
884         }
885
886         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
887         if (rc < 0) {
888                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
889                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
890                 return -EFAULT;
891         }
892
893         return 0;
894 }
895
896 static int __mdd_links_add(const struct lu_env *env,
897                            struct mdd_object *mdd_obj,
898                            struct linkea_data *ldata,
899                            const struct lu_name *lname,
900                            const struct lu_fid *pfid,
901                            int first, int check)
902 {
903         int rc;
904
905         if (ldata->ld_leh == NULL) {
906                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
907                 if (rc) {
908                         if (rc != -ENODATA)
909                                 return rc;
910                         rc = linkea_data_new(ldata,
911                                              &mdd_env_info(env)->mti_link_buf);
912                         if (rc)
913                                 return rc;
914                 }
915         }
916
917         if (check) {
918                 rc = linkea_links_find(ldata, lname, pfid);
919                 if (rc && rc != -ENOENT)
920                         return rc;
921                 if (rc == 0)
922                         return -EEXIST;
923         }
924
925         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
926                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
927
928                 *tfid = *pfid;
929                 tfid->f_ver = ~0;
930                 linkea_add_buf(ldata, lname, tfid);
931         }
932
933         return linkea_add_buf(ldata, lname, pfid);
934 }
935
936 static int __mdd_links_del(const struct lu_env *env,
937                            struct mdd_object *mdd_obj,
938                            struct linkea_data *ldata,
939                            const struct lu_name *lname,
940                            const struct lu_fid *pfid)
941 {
942         int rc;
943
944         if (ldata->ld_leh == NULL) {
945                 rc = mdd_links_read(env, mdd_obj, ldata);
946                 if (rc)
947                         return rc;
948         }
949
950         rc = linkea_links_find(ldata, lname, pfid);
951         if (rc)
952                 return rc;
953
954         linkea_del_buf(ldata, lname);
955         return 0;
956 }
957
958 static int mdd_linkea_prepare(const struct lu_env *env,
959                               struct mdd_object *mdd_obj,
960                               const struct lu_fid *oldpfid,
961                               const struct lu_name *oldlname,
962                               const struct lu_fid *newpfid,
963                               const struct lu_name *newlname,
964                               int first, int check,
965                               struct linkea_data *ldata)
966 {
967         int rc = 0;
968         int rc2 = 0;
969         ENTRY;
970
971         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
972                 return 0;
973
974         LASSERT(oldpfid != NULL || newpfid != NULL);
975
976         if (mdd_obj->mod_flags & DEAD_OBJ)
977                 /* No more links, don't bother */
978                 RETURN(0);
979
980         if (oldpfid != NULL) {
981                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
982                 if (rc) {
983                         if ((check == 0) ||
984                             (rc != -ENODATA && rc != -ENOENT))
985                                 RETURN(rc);
986                         /* No changes done. */
987                         rc = 0;
988                 }
989         }
990
991         /* If renaming, add the new record */
992         if (newpfid != NULL) {
993                 /* even if the add fails, we still delete the out-of-date
994                  * old link */
995                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
996                                       first, check);
997                 if (rc2 == -EEXIST)
998                         rc2 = 0;
999         }
1000
1001         rc = rc != 0 ? rc : rc2;
1002
1003         RETURN(rc);
1004 }
1005
1006 int mdd_links_rename(const struct lu_env *env,
1007                      struct mdd_object *mdd_obj,
1008                      const struct lu_fid *oldpfid,
1009                      const struct lu_name *oldlname,
1010                      const struct lu_fid *newpfid,
1011                      const struct lu_name *newlname,
1012                      struct thandle *handle,
1013                      struct linkea_data *ldata,
1014                      int first, int check)
1015 {
1016         int rc2 = 0;
1017         int rc = 0;
1018         ENTRY;
1019
1020         if (ldata == NULL) {
1021                 ldata = &mdd_env_info(env)->mti_link_data;
1022                 memset(ldata, 0, sizeof(*ldata));
1023                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1024                                         newpfid, newlname, first, check,
1025                                         ldata);
1026                 if (rc != 0)
1027                         GOTO(out, rc);
1028         }
1029
1030         if (ldata->ld_lee != NULL)
1031                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1032         EXIT;
1033 out:
1034         if (rc == 0)
1035                 rc = rc2;
1036         if (rc) {
1037                 int error = 1;
1038                 if (rc == -EOVERFLOW || rc == -ENOENT)
1039                         error = 0;
1040                 if (oldpfid == NULL)
1041                         CDEBUG(error ? D_ERROR : D_OTHER,
1042                                "link_ea add '%.*s' failed %d "DFID"\n",
1043                                newlname->ln_namelen, newlname->ln_name,
1044                                rc, PFID(mdd_object_fid(mdd_obj)));
1045                 else if (newpfid == NULL)
1046                         CDEBUG(error ? D_ERROR : D_OTHER,
1047                                "link_ea del '%.*s' failed %d "DFID"\n",
1048                                oldlname->ln_namelen, oldlname->ln_name,
1049                                rc, PFID(mdd_object_fid(mdd_obj)));
1050                 else
1051                         CDEBUG(error ? D_ERROR : D_OTHER,
1052                                "link_ea rename '%.*s'->'%.*s' failed %d "
1053                                DFID"\n",
1054                                oldlname->ln_namelen, oldlname->ln_name,
1055                                newlname->ln_namelen, newlname->ln_name,
1056                                rc, PFID(mdd_object_fid(mdd_obj)));
1057         }
1058
1059         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1060                 /* if we vmalloced a large buffer drop it */
1061                 lu_buf_free(ldata->ld_buf);
1062
1063         return rc;
1064 }
1065
1066 static inline int mdd_links_add(const struct lu_env *env,
1067                                 struct mdd_object *mdd_obj,
1068                                 const struct lu_fid *pfid,
1069                                 const struct lu_name *lname,
1070                                 struct thandle *handle,
1071                                 struct linkea_data *data, int first)
1072 {
1073         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1074                                 pfid, lname, handle, data, first, 0);
1075 }
1076
1077 static inline int mdd_links_del(const struct lu_env *env,
1078                                 struct mdd_object *mdd_obj,
1079                                 const struct lu_fid *pfid,
1080                                 const struct lu_name *lname,
1081                                 struct thandle *handle)
1082 {
1083         return mdd_links_rename(env, mdd_obj, pfid, lname,
1084                                 NULL, NULL, handle, NULL, 0, 0);
1085 }
1086
1087 /** Read the link EA into a temp buffer.
1088  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1089  * A pointer to the buffer is stored in \a ldata::ld_buf.
1090  *
1091  * \retval 0 or error
1092  */
1093 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1094                    struct linkea_data *ldata)
1095 {
1096         int rc;
1097
1098         /* First try a small buf */
1099         LASSERT(env != NULL);
1100         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1101                                                CFS_PAGE_SIZE);
1102         if (ldata->ld_buf->lb_buf == NULL)
1103                 return -ENOMEM;
1104
1105         if (!mdd_object_exists(mdd_obj))
1106                 return -ENODATA;
1107
1108         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1109                           BYPASS_CAPA);
1110         if (rc == -ERANGE) {
1111                 /* Buf was too small, figure out what we need. */
1112                 lu_buf_free(ldata->ld_buf);
1113                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1114                                    XATTR_NAME_LINK, BYPASS_CAPA);
1115                 if (rc < 0)
1116                         return rc;
1117                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1118                 if (ldata->ld_buf->lb_buf == NULL)
1119                         return -ENOMEM;
1120                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1121                                   XATTR_NAME_LINK, BYPASS_CAPA);
1122         }
1123         if (rc < 0)
1124                 return rc;
1125
1126         linkea_init(ldata);
1127         return 0;
1128 }
1129
1130 /** Read the link EA into a temp buffer.
1131  * Uses the name_buf since it is generally large.
1132  * \retval IS_ERR err
1133  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1134  */
1135 struct lu_buf *mdd_links_get(const struct lu_env *env,
1136                              struct mdd_object *mdd_obj)
1137 {
1138         struct linkea_data ldata = { 0 };
1139         int rc;
1140
1141         rc = mdd_links_read(env, mdd_obj, &ldata);
1142         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1143 }
1144
1145 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1146                     struct linkea_data *ldata, struct thandle *handle)
1147 {
1148         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1149                                                      ldata->ld_leh->leh_len);
1150         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1151                              mdd_object_capa(env, mdd_obj));
1152 }
1153
1154 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1155                           struct thandle *handle, struct linkea_data *ldata)
1156 {
1157         int     rc;
1158         int     ea_len;
1159         void    *linkea;
1160
1161         if (ldata != NULL && ldata->ld_lee != NULL) {
1162                 ea_len = ldata->ld_leh->leh_len;
1163                 linkea = ldata->ld_buf->lb_buf;
1164         } else {
1165                 ea_len = 4096;
1166                 linkea = NULL;
1167         }
1168
1169         /* XXX: max size? */
1170         rc = mdo_declare_xattr_set(env, mdd_obj,
1171                                    mdd_buf_get_const(env, linkea, ea_len),
1172                                    XATTR_NAME_LINK, 0, handle);
1173         return rc;
1174 }
1175
1176 static inline int mdd_declare_links_del(const struct lu_env *env,
1177                                         struct mdd_object *c,
1178                                         struct thandle *handle)
1179 {
1180         int rc = 0;
1181
1182         /* For directory, the linkEA will be removed together
1183          * with the object. */
1184         if (!S_ISDIR(mdd_object_type(c)))
1185                 rc = mdd_declare_links_add(env, c, handle, NULL);
1186
1187         return rc;
1188 }
1189
1190 static int mdd_declare_link(const struct lu_env *env,
1191                             struct mdd_device *mdd,
1192                             struct mdd_object *p,
1193                             struct mdd_object *c,
1194                             const struct lu_name *name,
1195                             struct thandle *handle,
1196                             struct linkea_data *data)
1197 {
1198         int rc;
1199
1200         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
1201         if (rc)
1202                 return rc;
1203
1204         rc = mdo_declare_ref_add(env, c, handle);
1205         if (rc)
1206                 return rc;
1207
1208         rc = mdo_declare_attr_set(env, p, NULL, handle);
1209         if (rc)
1210                 return rc;
1211
1212         rc = mdo_declare_attr_set(env, c, NULL, handle);
1213         if (rc)
1214                 return rc;
1215
1216         rc = mdd_declare_links_add(env, c, handle, data);
1217         if (rc)
1218                 return rc;
1219
1220         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1221
1222         return rc;
1223 }
1224
1225 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1226                     struct md_object *src_obj, const struct lu_name *lname,
1227                     struct md_attr *ma)
1228 {
1229         const char *name = lname->ln_name;
1230         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1231         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1232         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1233         struct mdd_device *mdd = mdo2mdd(src_obj);
1234         struct dynlock_handle *dlh;
1235         struct thandle *handle;
1236         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1237         int rc;
1238         ENTRY;
1239
1240         handle = mdd_trans_create(env, mdd);
1241         if (IS_ERR(handle))
1242                 GOTO(out_pending, rc = PTR_ERR(handle));
1243
1244         memset(ldata, 0, sizeof(*ldata));
1245
1246         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1247                               ldata);
1248         if (rc)
1249                 GOTO(stop, rc);
1250
1251         rc = mdd_trans_start(env, mdd, handle);
1252         if (rc)
1253                 GOTO(stop, rc);
1254
1255         dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
1256         if (dlh == NULL)
1257                 GOTO(out_trans, rc = -ENOMEM);
1258         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1259
1260         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
1261         if (rc)
1262                 GOTO(out_unlock, rc);
1263
1264         rc = mdo_ref_add(env, mdd_sobj, handle);
1265         if (rc)
1266                 GOTO(out_unlock, rc);
1267
1268
1269         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1270                                      name, handle,
1271                                      mdd_object_capa(env, mdd_tobj));
1272         if (rc != 0) {
1273                 mdo_ref_del(env, mdd_sobj, handle);
1274                 GOTO(out_unlock, rc);
1275         }
1276
1277         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1278         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1279
1280         la->la_valid = LA_CTIME | LA_MTIME;
1281         rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
1282         if (rc)
1283                 GOTO(out_unlock, rc);
1284
1285         la->la_valid = LA_CTIME;
1286         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
1287         if (rc == 0) {
1288                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1289                                         mdo2fid(mdd_tobj), lname, 0, 0,
1290                                         ldata);
1291                 if (rc == 0)
1292                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1293                                       lname, handle, ldata, 0);
1294                 /* The failure of links_add should not cause the link
1295                  * failure, reset rc here */
1296                 rc = 0;
1297         }
1298         EXIT;
1299 out_unlock:
1300         mdd_write_unlock(env, mdd_sobj);
1301         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
1302 out_trans:
1303         if (rc == 0)
1304                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1305                                             mdd_tobj, lname, handle);
1306 stop:
1307         mdd_trans_stop(env, mdd, rc, handle);
1308
1309         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1310                 /* if we vmalloced a large buffer drop it */
1311                 lu_buf_free(ldata->ld_buf);
1312 out_pending:
1313         return rc;
1314 }
1315
1316 int mdd_declare_finish_unlink(const struct lu_env *env,
1317                               struct mdd_object *obj,
1318                               struct md_attr *ma,
1319                               struct thandle *handle)
1320 {
1321         int     rc;
1322
1323         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1324         if (rc)
1325                 return rc;
1326
1327         return mdo_declare_destroy(env, obj, handle);
1328 }
1329
1330 /* caller should take a lock before calling */
1331 int mdd_finish_unlink(const struct lu_env *env,
1332                       struct mdd_object *obj, struct md_attr *ma,
1333                       struct thandle *th)
1334 {
1335         int rc = 0;
1336         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1337         ENTRY;
1338
1339         LASSERT(mdd_write_locked(env, obj) != 0);
1340
1341         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
1342                 obj->mod_flags |= DEAD_OBJ;
1343                 /* add new orphan and the object
1344                  * will be deleted during mdd_close() */
1345                 if (obj->mod_count) {
1346                         rc = __mdd_orphan_add(env, obj, th);
1347                         if (rc == 0)
1348                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1349                                         "orphan list, open count = %d\n",
1350                                         PFID(mdd_object_fid(obj)),
1351                                         obj->mod_count);
1352                         else
1353                                 CERROR("Object "DFID" fail to be an orphan, "
1354                                        "open count = %d, maybe cause failed "
1355                                        "open replay\n",
1356                                         PFID(mdd_object_fid(obj)),
1357                                         obj->mod_count);
1358                 } else {
1359                         rc = mdo_destroy(env, obj, th);
1360                 }
1361         }
1362
1363         RETURN(rc);
1364 }
1365
1366 /*
1367  * pobj maybe NULL
1368  * has mdd_write_lock on cobj already, but not on pobj yet
1369  */
1370 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1371                             struct mdd_object *cobj, struct lu_attr *cattr)
1372 {
1373         int rc;
1374         ENTRY;
1375
1376         rc = mdd_may_delete(env, pobj, cobj, cattr, NULL, 1, 1);
1377
1378         RETURN(rc);
1379 }
1380
1381 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1382                               struct mdd_object *p, struct mdd_object *c,
1383                               const struct lu_name *name, struct md_attr *ma,
1384                               struct thandle *handle, int no_name)
1385 {
1386         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1387         int rc;
1388
1389         if (likely(no_name == 0)) {
1390                 rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1391                 if (rc)
1392                         return rc;
1393         }
1394
1395         rc = mdo_declare_ref_del(env, p, handle);
1396         if (rc)
1397                 return rc;
1398
1399         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1400         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1401         la->la_valid = LA_CTIME | LA_MTIME;
1402         rc = mdo_declare_attr_set(env, p, la, handle);
1403         if (rc)
1404                 return rc;
1405
1406         if (c != NULL) {
1407                 rc = mdo_declare_ref_del(env, c, handle);
1408                 if (rc)
1409                         return rc;
1410
1411                 rc = mdo_declare_ref_del(env, c, handle);
1412                 if (rc)
1413                         return rc;
1414
1415                 rc = mdo_declare_attr_set(env, c, NULL, handle);
1416                 if (rc)
1417                         return rc;
1418
1419                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1420                 if (rc)
1421                         return rc;
1422
1423                 rc = mdd_declare_links_del(env, c, handle);
1424                 if (rc != 0)
1425                         return rc;
1426
1427                 /* FIXME: need changelog for remove entry */
1428                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1429         }
1430
1431         return rc;
1432 }
1433
1434 /**
1435  * Delete name entry and the object.
1436  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1437  * does not exist for this object, and it could only happen during resending
1438  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1439  * is also needed in this case(needed by changelog), so we have to add another
1440  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1441  * the ENOENT failure should be able to be fixed by redo mechanism.
1442  */
1443 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1444                       struct md_object *cobj, const struct lu_name *lname,
1445                       struct md_attr *ma, int no_name)
1446 {
1447         const char *name = lname->ln_name;
1448         struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
1449         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1450         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1451         struct mdd_object *mdd_cobj = NULL;
1452         struct mdd_device *mdd = mdo2mdd(pobj);
1453         struct dynlock_handle *dlh;
1454         struct thandle    *handle;
1455         int rc, is_dir = 0;
1456         ENTRY;
1457
1458         /* cobj == NULL means only delete name entry */
1459         if (likely(cobj != NULL)) {
1460                 mdd_cobj = md2mdd_obj(cobj);
1461                 if (mdd_object_exists(mdd_cobj) == 0)
1462                         RETURN(-ENOENT);
1463                 /* currently it is assume, it could only delete
1464                  * name entry of remote directory */
1465                 is_dir = 1;
1466         }
1467
1468         handle = mdd_trans_create(env, mdd);
1469         if (IS_ERR(handle))
1470                 RETURN(PTR_ERR(handle));
1471
1472         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1473                                 lname, ma, handle, no_name);
1474         if (rc)
1475                 GOTO(stop, rc);
1476
1477         rc = mdd_trans_start(env, mdd, handle);
1478         if (rc)
1479                 GOTO(stop, rc);
1480
1481         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
1482         if (dlh == NULL)
1483                 GOTO(stop, rc = -ENOMEM);
1484
1485         if (likely(mdd_cobj != NULL)) {
1486                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1487
1488                 /* fetch cattr */
1489                 rc = mdd_la_get(env, mdd_cobj, cattr,
1490                                 mdd_object_capa(env, mdd_cobj));
1491                 if (rc)
1492                         GOTO(cleanup, rc);
1493
1494                 is_dir = S_ISDIR(cattr->la_mode);
1495
1496         }
1497
1498         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
1499         if (rc)
1500                 GOTO(cleanup, rc);
1501
1502         if (likely(no_name == 0)) {
1503                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1504                                         mdd_object_capa(env, mdd_pobj));
1505                 if (rc)
1506                         GOTO(cleanup, rc);
1507         }
1508
1509         if (likely(mdd_cobj != NULL)) {
1510                 rc = mdo_ref_del(env, mdd_cobj, handle);
1511                 if (rc != 0) {
1512                         __mdd_index_insert_only(env, mdd_pobj,
1513                                                 mdo2fid(mdd_cobj),
1514                                                 name, handle,
1515                                                 mdd_object_capa(env, mdd_pobj));
1516                         GOTO(cleanup, rc);
1517                 }
1518
1519                 if (is_dir)
1520                         /* unlink dot */
1521                         mdo_ref_del(env, mdd_cobj, handle);
1522
1523                 /* fetch updated nlink */
1524                 rc = mdd_la_get(env, mdd_cobj, cattr,
1525                                 mdd_object_capa(env, mdd_cobj));
1526                 if (rc)
1527                         GOTO(cleanup, rc);
1528         }
1529
1530         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1531         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1532
1533         la->la_valid = LA_CTIME | LA_MTIME;
1534         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
1535         if (rc)
1536                 GOTO(cleanup, rc);
1537
1538         /* Enough for only unlink the entry */
1539         if (unlikely(mdd_cobj == NULL)) {
1540                 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1541                 GOTO(stop, rc);
1542         }
1543
1544         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1545                 /* update ctime of an unlinked file only if it is still
1546                  * opened or a link still exists */
1547                 la->la_valid = LA_CTIME;
1548                 rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
1549                 if (rc)
1550                         GOTO(cleanup, rc);
1551         }
1552
1553         /* XXX: this transfer to ma will be removed with LOD/OSP */
1554         ma->ma_attr = *cattr;
1555         ma->ma_valid |= MA_INODE;
1556         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1557
1558         /* fetch updated nlink */
1559         if (rc == 0)
1560                 rc = mdd_la_get(env, mdd_cobj, cattr,
1561                                 mdd_object_capa(env, mdd_cobj));
1562
1563         if (!is_dir)
1564                 /* old files may not have link ea; ignore errors */
1565                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1566
1567         /* if object is removed then we can't get its attrs, use last get */
1568         if (cattr->la_nlink == 0) {
1569                 ma->ma_attr = *cattr;
1570                 ma->ma_valid |= MA_INODE;
1571         }
1572         EXIT;
1573 cleanup:
1574         mdd_write_unlock(env, mdd_cobj);
1575         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1576         if (rc == 0) {
1577                 int cl_flags;
1578
1579                 cl_flags = (cattr->la_nlink == 0) ? CLF_UNLINK_LAST : 0;
1580                 if ((ma->ma_valid & MA_HSM) &&
1581                     (ma->ma_hsm.mh_flags & HS_EXISTS))
1582                         cl_flags |= CLF_UNLINK_HSM_EXISTS;
1583
1584                 rc = mdd_changelog_ns_store(env, mdd,
1585                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1586                         mdd_cobj, mdd_pobj, lname, handle);
1587         }
1588
1589 stop:
1590         mdd_trans_stop(env, mdd, rc, handle);
1591
1592         return rc;
1593 }
1594
1595 /*
1596  * The permission has been checked when obj created, no need check again.
1597  */
1598 static int mdd_cd_sanity_check(const struct lu_env *env,
1599                                struct mdd_object *obj)
1600 {
1601         ENTRY;
1602
1603         /* EEXIST check */
1604         if (!obj || mdd_is_dead_obj(obj))
1605                 RETURN(-ENOENT);
1606
1607         RETURN(0);
1608
1609 }
1610
1611 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1612                            struct md_object *cobj, const struct md_op_spec *spec,
1613                            struct md_attr *ma)
1614 {
1615         struct mdd_device *mdd = mdo2mdd(cobj);
1616         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1617         struct mdd_object *son = md2mdd_obj(cobj);
1618         struct thandle    *handle;
1619         const struct lu_buf *buf;
1620         struct lu_attr    *attr = &mdd_env_info(env)->mti_cattr;
1621         int                rc;
1622         ENTRY;
1623
1624         /* do not let users to create stripes via .lustre/
1625          * mdd_obf_setup() sets IMMUTE_OBJ on this directory */
1626         if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ)
1627                 RETURN(-ENOENT);
1628
1629         rc = mdd_cd_sanity_check(env, son);
1630         if (rc)
1631                 RETURN(rc);
1632
1633         if (!md_should_create(spec->sp_cr_flags))
1634                 RETURN(0);
1635
1636         /*
1637          * there are following use cases for this function:
1638          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1639          *    striping can be specified or not
1640          * 2) CMD?
1641          */
1642         rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
1643         if (rc)
1644                 RETURN(rc);
1645
1646         /* calling ->ah_make_hint() is used to transfer information from parent */
1647         mdd_object_make_hint(env, mdd_pobj, son, attr);
1648
1649         handle = mdd_trans_create(env, mdd);
1650         if (IS_ERR(handle))
1651                 GOTO(out_free, rc = PTR_ERR(handle));
1652
1653         /*
1654          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1655          * Should this be fixed?
1656          */
1657         CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
1658                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1659                spec->sp_cr_flags, spec->no_create);
1660
1661         if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
1662                 /* replay case or lfs setstripe */
1663                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1664                                         spec->u.sp_ea.eadatalen);
1665         } else {
1666                 buf = &LU_BUF_NULL;
1667         }
1668
1669         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1670                                   XATTR_NAME_LOV, 0, handle);
1671         if (rc)
1672                 GOTO(stop, rc);
1673
1674         rc = mdd_trans_start(env, mdd, handle);
1675         if (rc)
1676                 GOTO(stop, rc);
1677
1678         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1679                           0, handle, mdd_object_capa(env, son));
1680 stop:
1681         mdd_trans_stop(env, mdd, rc, handle);
1682 out_free:
1683         RETURN(rc);
1684 }
1685
1686 /* Get fid from name and parent */
1687 static int
1688 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
1689              const struct lu_name *lname, struct lu_fid* fid, int mask)
1690 {
1691         const char          *name = lname->ln_name;
1692         const struct dt_key *key = (const struct dt_key *)name;
1693         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
1694         struct mdd_device   *m = mdo2mdd(pobj);
1695         struct dt_object    *dir = mdd_object_child(mdd_obj);
1696         int rc;
1697         ENTRY;
1698
1699         if (unlikely(mdd_is_dead_obj(mdd_obj)))
1700                 RETURN(-ESTALE);
1701
1702         if (mdd_object_remote(mdd_obj)) {
1703                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
1704                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
1705         } else if (!mdd_object_exists(mdd_obj)) {
1706                 RETURN(-ESTALE);
1707         }
1708
1709         /* The common filename length check. */
1710         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
1711                 RETURN(-ENAMETOOLONG);
1712
1713         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
1714                                             MOR_TGT_PARENT);
1715         if (rc)
1716                 RETURN(rc);
1717
1718         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
1719                    dt_try_as_dir(env, dir))) {
1720
1721                 rc = dir->do_index_ops->dio_lookup(env, dir,
1722                                                  (struct dt_rec *)fid, key,
1723                                                  mdd_object_capa(env, mdd_obj));
1724                 if (rc > 0)
1725                         rc = 0;
1726                 else if (rc == 0)
1727                         rc = -ENOENT;
1728         } else
1729                 rc = -ENOTDIR;
1730
1731         RETURN(rc);
1732 }
1733
1734 static int mdd_declare_object_initialize(const struct lu_env *env,
1735                                          struct mdd_object *parent,
1736                                          struct mdd_object *child,
1737                                          struct lu_attr *attr,
1738                                          struct thandle *handle,
1739                                          struct linkea_data *ldata)
1740 {
1741         int rc;
1742         ENTRY;
1743
1744         /*
1745          * inode mode has been set in creation time, and it's based on umask,
1746          * la_mode and acl, don't set here again! (which will go wrong
1747          * because below function doesn't consider umask).
1748          * I'd suggest set all object attributes in creation time, see above.
1749          */
1750         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1751         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1752         rc = mdo_declare_attr_set(env, child, attr, handle);
1753         attr->la_valid |= LA_MODE | LA_TYPE;
1754         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1755                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1756                                               dot, handle);
1757                 if (rc == 0)
1758                         rc = mdo_declare_ref_add(env, child, handle);
1759
1760                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1761                                               dotdot, handle);
1762         }
1763
1764         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1765                         fid_is_dot_lustre(mdo2fid(child)) ||
1766                         fid_is_root(mdo2fid(child))))
1767                 mdd_declare_links_add(env, child, handle, ldata);
1768
1769         RETURN(rc);
1770 }
1771
1772 static int mdd_object_initialize(const struct lu_env *env,
1773                                  const struct lu_fid *pfid,
1774                                  const struct lu_name *lname,
1775                                  struct mdd_object *child,
1776                                  struct lu_attr *attr, struct thandle *handle,
1777                                  const struct md_op_spec *spec,
1778                                  struct linkea_data *ldata)
1779 {
1780         int rc;
1781         ENTRY;
1782
1783         /*
1784          * Update attributes for child.
1785          *
1786          * FIXME:
1787          *  (1) the valid bits should be converted between Lustre and Linux;
1788          *  (2) maybe, the child attributes should be set in OSD when creation.
1789          */
1790
1791         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1792         /* arguments are supposed to stay the same */
1793         if (S_ISDIR(attr->la_mode)) {
1794                 /* Add "." and ".." for newly created dir */
1795                 mdo_ref_add(env, child, handle);
1796                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1797                                              dot, handle, BYPASS_CAPA);
1798                 if (rc == 0)
1799                         rc = __mdd_index_insert_only(env, child, pfid,
1800                                                      dotdot, handle,
1801                                                      BYPASS_CAPA);
1802                 if (rc != 0)
1803                         mdo_ref_del(env, child, handle);
1804         }
1805
1806         if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
1807                         fid_is_dot_lustre(mdo2fid(child)) ||
1808                         fid_is_root(mdo2fid(child))))
1809                 mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
1810
1811         RETURN(rc);
1812 }
1813
1814 /* has not lock on pobj yet */
1815 static int mdd_create_sanity_check(const struct lu_env *env,
1816                                    struct md_object *pobj,
1817                                    struct lu_attr *pattr,
1818                                    const struct lu_name *lname,
1819                                    struct lu_attr *cattr,
1820                                    struct md_op_spec *spec)
1821 {
1822         struct mdd_thread_info *info = mdd_env_info(env);
1823         struct lu_fid     *fid       = &info->mti_fid;
1824         struct mdd_object *obj       = md2mdd_obj(pobj);
1825         struct mdd_device *m         = mdo2mdd(pobj);
1826         int rc;
1827         ENTRY;
1828
1829         /* EEXIST check */
1830         if (mdd_is_dead_obj(obj))
1831                 RETURN(-ENOENT);
1832
1833         /*
1834          * In some cases this lookup is not needed - we know before if name
1835          * exists or not because MDT performs lookup for it.
1836          * name length check is done in lookup.
1837          */
1838         if (spec->sp_cr_lookup) {
1839                 /*
1840                  * Check if the name already exist, though it will be checked in
1841                  * _index_insert also, for avoiding rolling back if exists
1842                  * _index_insert.
1843                  */
1844                 rc = __mdd_lookup_locked(env, pobj, lname, fid,
1845                                          MAY_WRITE | MAY_EXEC);
1846                 if (rc != -ENOENT)
1847                         RETURN(rc ? : -EEXIST);
1848         } else {
1849                 /*
1850                  * Check WRITE permission for the parent.
1851                  * EXEC permission have been checked
1852                  * when lookup before create already.
1853                  */
1854                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1855                                                     MOR_TGT_PARENT);
1856                 if (rc)
1857                         RETURN(rc);
1858         }
1859
1860         /* sgid check */
1861         if (pattr->la_mode & S_ISGID) {
1862                 cattr->la_gid = pattr->la_gid;
1863                 if (S_ISDIR(cattr->la_mode)) {
1864                         cattr->la_mode |= S_ISGID;
1865                         cattr->la_valid |= LA_MODE;
1866                 }
1867         }
1868
1869         switch (cattr->la_mode & S_IFMT) {
1870         case S_IFLNK: {
1871                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1872
1873                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1874                         RETURN(-ENAMETOOLONG);
1875                 else
1876                         RETURN(0);
1877         }
1878         case S_IFDIR:
1879         case S_IFREG:
1880         case S_IFCHR:
1881         case S_IFBLK:
1882         case S_IFIFO:
1883         case S_IFSOCK:
1884                 rc = 0;
1885                 break;
1886         default:
1887                 rc = -EINVAL;
1888                 break;
1889         }
1890         RETURN(rc);
1891 }
1892
1893 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1894                               struct mdd_object *p, struct mdd_object *c,
1895                               const struct lu_name *name,
1896                               struct lu_attr *attr,
1897                               int got_def_acl,
1898                               struct thandle *handle,
1899                               const struct md_op_spec *spec,
1900                               struct linkea_data *ldata)
1901 {
1902         int rc;
1903
1904         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1905         if (rc)
1906                 GOTO(out, rc);
1907
1908 #ifdef CONFIG_FS_POSIX_ACL
1909         if (got_def_acl > 0) {
1910                 struct lu_buf *acl_buf;
1911
1912                 acl_buf = mdd_buf_get(env, NULL, got_def_acl);
1913                 /* if dir, then can inherit default ACl */
1914                 if (S_ISDIR(attr->la_mode)) {
1915                         rc = mdo_declare_xattr_set(env, c, acl_buf,
1916                                                    XATTR_NAME_ACL_DEFAULT,
1917                                                    0, handle);
1918                         if (rc)
1919                                 GOTO(out, rc);
1920                 }
1921
1922                 rc = mdo_declare_attr_set(env, c, attr, handle);
1923                 if (rc)
1924                         GOTO(out, rc);
1925
1926                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1927                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1928                 if (rc)
1929                         GOTO(out, rc);
1930         }
1931 #endif
1932
1933         if (S_ISDIR(attr->la_mode)) {
1934                 rc = mdo_declare_ref_add(env, p, handle);
1935                 if (rc)
1936                         GOTO(out, rc);
1937         }
1938
1939         rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
1940         if (rc)
1941                 GOTO(out, rc);
1942
1943         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1944                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1945         else
1946                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1947                                               name->ln_name, handle);
1948         if (rc)
1949                 GOTO(out, rc);
1950
1951         /* replay case, create LOV EA from client data */
1952         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1953                 const struct lu_buf *buf;
1954
1955                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1956                                         spec->u.sp_ea.eadatalen);
1957                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
1958                                            0, handle);
1959                 if (rc)
1960                         GOTO(out, rc);
1961         }
1962
1963         if (S_ISLNK(attr->la_mode)) {
1964                 rc = dt_declare_record_write(env, mdd_object_child(c),
1965                                              strlen(spec->u.sp_symname), 0,
1966                                              handle);
1967                 if (rc)
1968                         GOTO(out, rc);
1969         }
1970
1971         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1972                 rc = mdo_declare_attr_set(env, p, attr, handle);
1973                 if (rc)
1974                         return rc;
1975         }
1976
1977         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1978         if (rc)
1979                 return rc;
1980
1981 out:
1982         return rc;
1983 }
1984
1985 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1986                         struct lu_attr *la, struct lu_buf *acl_buf,
1987                         int *got_def_acl, int *reset_acl)
1988 {
1989         int     rc;
1990         ENTRY;
1991
1992         if (S_ISLNK(la->la_mode))
1993                 RETURN(0);
1994
1995         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
1996         rc = mdo_xattr_get(env, pobj, acl_buf,
1997                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
1998         mdd_read_unlock(env, pobj);
1999         if (rc > 0) {
2000                 /* If there are default ACL, fix mode by default ACL */
2001                 *got_def_acl = rc;
2002                 acl_buf->lb_len = rc;
2003                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
2004                 if (rc < 0)
2005                         RETURN(rc);
2006                 *reset_acl = rc;
2007         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
2008                 /* If there are no default ACL, fix mode by mask */
2009                 struct lu_ucred *uc = lu_ucred(env);
2010
2011                 /* The create triggered by MDT internal events, such as
2012                  * LFSCK reset, will not contain valid "uc". */
2013                 if (unlikely(uc != NULL))
2014                         la->la_mode &= ~uc->uc_umask;
2015                 rc = 0;
2016         }
2017
2018         RETURN(rc);
2019 }
2020
2021 /*
2022  * Create object and insert it into namespace.
2023  */
2024 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2025                       const struct lu_name *lname, struct md_object *child,
2026                       struct md_op_spec *spec, struct md_attr* ma)
2027 {
2028         struct mdd_thread_info  *info = mdd_env_info(env);
2029         struct lu_attr          *la = &info->mti_la_for_fix;
2030         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2031         struct mdd_object       *son = md2mdd_obj(child);
2032         struct mdd_device       *mdd = mdo2mdd(pobj);
2033         struct lu_attr          *attr = &ma->ma_attr;
2034         struct thandle          *handle;
2035         struct lu_attr          *pattr = &info->mti_pattr;
2036         struct lu_buf           acl_buf;
2037         struct linkea_data      *ldata = &info->mti_link_data;
2038         struct dynlock_handle   *dlh;
2039         const char              *name = lname->ln_name;
2040         int                      rc, created = 0, initialized = 0, inserted = 0;
2041         int                      got_def_acl = 0;
2042         int                      reset_acl = 0;
2043         ENTRY;
2044
2045         /*
2046          * Two operations have to be performed:
2047          *
2048          *  - an allocation of a new object (->do_create()), and
2049          *
2050          *  - an insertion into a parent index (->dio_insert()).
2051          *
2052          * Due to locking, operation order is not important, when both are
2053          * successful, *but* error handling cases are quite different:
2054          *
2055          *  - if insertion is done first, and following object creation fails,
2056          *  insertion has to be rolled back, but this operation might fail
2057          *  also leaving us with dangling index entry.
2058          *
2059          *  - if creation is done first, is has to be undone if insertion
2060          *  fails, leaving us with leaked space, which is neither good, nor
2061          *  fatal.
2062          *
2063          * It seems that creation-first is simplest solution, but it is
2064          * sub-optimal in the frequent
2065          *
2066          *         $ mkdir foo
2067          *         $ mkdir foo
2068          *
2069          * case, because second mkdir is bound to create object, only to
2070          * destroy it immediately.
2071          *
2072          * To avoid this follow local file systems that do double lookup:
2073          *
2074          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2075          *
2076          *     1. create            (mdd_object_create_internal())
2077          *
2078          *     2. insert            (__mdd_index_insert(), lookup again)
2079          */
2080
2081         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2082         if (rc != 0)
2083                 RETURN(rc);
2084
2085         /* Sanity checks before big job. */
2086         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2087         if (rc)
2088                 RETURN(rc);
2089
2090         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2091                 GOTO(out_free, rc = -EINPROGRESS);
2092
2093         acl_buf.lb_buf = info->mti_xattr_buf;
2094         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2095         rc = mdd_acl_init(env, mdd_pobj, attr, &acl_buf, &got_def_acl,
2096                           &reset_acl);
2097         if (rc < 0)
2098                 GOTO(out_free, rc);
2099
2100         mdd_object_make_hint(env, mdd_pobj, son, attr);
2101
2102         handle = mdd_trans_create(env, mdd);
2103         if (IS_ERR(handle))
2104                 GOTO(out_free, rc = PTR_ERR(handle));
2105
2106         memset(ldata, 0, sizeof(*ldata));
2107         mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
2108                            lname, 1, 0, ldata);
2109         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2110                                 got_def_acl, handle, spec, ldata);
2111         if (rc)
2112                 GOTO(out_stop, rc);
2113
2114         rc = mdd_trans_start(env, mdd, handle);
2115         if (rc)
2116                 GOTO(out_stop, rc);
2117
2118         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
2119         if (dlh == NULL)
2120                 GOTO(out_trans, rc = -ENOMEM);
2121
2122         mdd_write_lock(env, son, MOR_TGT_CHILD);
2123         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
2124         if (rc) {
2125                 mdd_write_unlock(env, son);
2126                 GOTO(cleanup, rc);
2127         }
2128
2129         created = 1;
2130
2131 #ifdef CONFIG_FS_POSIX_ACL
2132         if (got_def_acl) {
2133                 /* set default acl */
2134                 if (S_ISDIR(attr->la_mode)) {
2135                         LASSERTF(acl_buf.lb_len  == got_def_acl,
2136                                  "invalid acl_buf: %p:%d got_def %d\n",
2137                                  acl_buf.lb_buf, (int)acl_buf.lb_len,
2138                                  got_def_acl);
2139                         rc = mdo_xattr_set(env, son, &acl_buf,
2140                                            XATTR_NAME_ACL_DEFAULT, 0,
2141                                            handle, BYPASS_CAPA);
2142                         if (rc) {
2143                                 mdd_write_unlock(env, son);
2144                                 GOTO(cleanup, rc);
2145                         }
2146                 }
2147
2148                 /* set its own acl */
2149                 if (reset_acl) {
2150                         LASSERTF(acl_buf.lb_buf != NULL && acl_buf.lb_len != 0,
2151                                  "invalid acl_buf %p:%d\n", acl_buf.lb_buf,
2152                                  (int)acl_buf.lb_len);
2153                         rc = mdo_xattr_set(env, son, &acl_buf,
2154                                            XATTR_NAME_ACL_ACCESS,
2155                                            0, handle, BYPASS_CAPA);
2156                         if (rc) {
2157                                 mdd_write_unlock(env, son);
2158                                 GOTO(cleanup, rc);
2159                         }
2160                 }
2161         }
2162 #endif
2163
2164         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
2165                                    son, attr, handle, spec, ldata);
2166
2167         /*
2168          * in case of replay we just set LOVEA provided by the client
2169          * XXX: I think it would be interesting to try "old" way where
2170          *      MDT calls this xattr_set(LOV) in a different transaction.
2171          *      probably this way we code can be made better.
2172          */
2173         if (rc == 0 && (spec->no_create ||
2174                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
2175                 const struct lu_buf *buf;
2176
2177                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2178                                 spec->u.sp_ea.eadatalen);
2179                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
2180                                 BYPASS_CAPA);
2181         }
2182
2183         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2184                 rc = __mdd_orphan_add(env, son, handle);
2185
2186         mdd_write_unlock(env, son);
2187
2188         if (rc != 0)
2189                 /*
2190                  * Object has no links, so it will be destroyed when last
2191                  * reference is released. (XXX not now.)
2192                  */
2193                 GOTO(cleanup, rc);
2194
2195         initialized = 1;
2196
2197         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
2198                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2199                                         name, S_ISDIR(attr->la_mode), handle,
2200                                         mdd_object_capa(env, mdd_pobj));
2201
2202         if (rc != 0)
2203                 GOTO(cleanup, rc);
2204
2205         inserted = 1;
2206
2207         if (S_ISLNK(attr->la_mode)) {
2208                 struct lu_ucred  *uc = lu_ucred_assert(env);
2209                 struct dt_object *dt = mdd_object_child(son);
2210                 const char *target_name = spec->u.sp_symname;
2211                 int sym_len = strlen(target_name);
2212                 const struct lu_buf *buf;
2213                 loff_t pos = 0;
2214
2215                 buf = mdd_buf_get_const(env, target_name, sym_len);
2216                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2217                                                 mdd_object_capa(env, son),
2218                                                 uc->uc_cap &
2219                                                 CFS_CAP_SYS_RESOURCE_MASK);
2220
2221                 if (rc == sym_len)
2222                         rc = 0;
2223                 else
2224                         GOTO(cleanup, rc = -EFAULT);
2225         }
2226
2227         /* volatile file creation does not update parent directory times */
2228         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2229                 GOTO(cleanup, rc = 0);
2230
2231         /* update parent directory mtime/ctime */
2232         *la = *attr;
2233         la->la_valid = LA_CTIME | LA_MTIME;
2234         rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
2235         if (rc)
2236                 GOTO(cleanup, rc);
2237
2238         EXIT;
2239 cleanup:
2240         if (rc != 0 && created != 0) {
2241                 int rc2;
2242
2243                 if (inserted != 0) {
2244                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2245                                 rc2 = __mdd_orphan_del(env, son, handle);
2246                         else
2247                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
2248                                                          S_ISDIR(attr->la_mode),
2249                                                          handle, BYPASS_CAPA);
2250                         if (rc2 != 0)
2251                                 goto out_stop;
2252                 }
2253
2254                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2255                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
2256                         /* Drop the reference, no need to delete "."/"..",
2257                          * because the object to be destroied directly. */
2258                         rc2 = mdo_ref_del(env, son, handle);
2259                         if (rc2 != 0) {
2260                                 mdd_write_unlock(env, son);
2261                                 goto out_stop;
2262                         }
2263                 }
2264
2265                 rc2 = mdo_ref_del(env, son, handle);
2266                 if (rc2 != 0) {
2267                         mdd_write_unlock(env, son);
2268                         goto out_stop;
2269                 }
2270
2271                 mdo_destroy(env, son, handle);
2272                 mdd_write_unlock(env, son);
2273         }
2274
2275         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
2276 out_trans:
2277         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)))
2278                 rc = mdd_changelog_ns_store(env, mdd,
2279                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
2280                         S_ISREG(attr->la_mode) ? CL_CREATE :
2281                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2282                         0, son, mdd_pobj, lname, handle);
2283 out_stop:
2284         mdd_trans_stop(env, mdd, rc, handle);
2285 out_free:
2286         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2287                 /* if we vmalloced a large buffer drop it */
2288                 lu_buf_free(ldata->ld_buf);
2289
2290         /* The child object shouldn't be cached anymore */
2291         if (rc)
2292                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2293                             &child->mo_lu.lo_header->loh_flags);
2294         return rc;
2295 }
2296
2297 /*
2298  * Get locks on parents in proper order
2299  * RETURN: < 0 - error, rename_order if successful
2300  */
2301 enum rename_order {
2302         MDD_RN_SAME,
2303         MDD_RN_SRCTGT,
2304         MDD_RN_TGTSRC
2305 };
2306
2307 static int mdd_rename_order(const struct lu_env *env,
2308                             struct mdd_device *mdd,
2309                             struct mdd_object *src_pobj,
2310                             struct mdd_object *tgt_pobj)
2311 {
2312         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2313         int rc;
2314         ENTRY;
2315
2316         if (src_pobj == tgt_pobj)
2317                 RETURN(MDD_RN_SAME);
2318
2319         /* compared the parent child relationship of src_p&tgt_p */
2320         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2321                 rc = MDD_RN_SRCTGT;
2322         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2323                 rc = MDD_RN_TGTSRC;
2324         } else {
2325                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
2326                 if (rc == -EREMOTE)
2327                         rc = 0;
2328
2329                 if (rc == 1)
2330                         rc = MDD_RN_TGTSRC;
2331                 else if (rc == 0)
2332                         rc = MDD_RN_SRCTGT;
2333         }
2334
2335         RETURN(rc);
2336 }
2337
2338 /* has not mdd_write{read}_lock on any obj yet. */
2339 static int mdd_rename_sanity_check(const struct lu_env *env,
2340                                    struct mdd_object *src_pobj,
2341                                    struct mdd_object *tgt_pobj,
2342                                    struct mdd_object *sobj,
2343                                    struct mdd_object *tobj,
2344                                    struct lu_attr *so_attr,
2345                                    struct lu_attr *tg_attr)
2346 {
2347         int rc = 0;
2348         ENTRY;
2349
2350         /* XXX: when get here, sobj must NOT be NULL,
2351          * the other case has been processed in cld_rename
2352          * before mdd_rename and enable MDS_PERM_BYPASS. */
2353         LASSERT(sobj);
2354
2355         rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
2356         if (rc)
2357                 RETURN(rc);
2358
2359         /* XXX: when get here, "tobj == NULL" means tobj must
2360          * NOT exist (neither on remote MDS, such case has been
2361          * processed in cld_rename before mdd_rename and enable
2362          * MDS_PERM_BYPASS).
2363          * So check may_create, but not check may_unlink. */
2364         if (!tobj)
2365                 rc = mdd_may_create(env, tgt_pobj, NULL,
2366                                     (src_pobj != tgt_pobj), 0);
2367         else
2368                 rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
2369                                     (src_pobj != tgt_pobj), 1);
2370
2371         if (!rc && !tobj && (src_pobj != tgt_pobj) &&
2372             S_ISDIR(so_attr->la_mode))
2373                 rc = __mdd_may_link(env, tgt_pobj);
2374
2375         RETURN(rc);
2376 }
2377
2378 static int mdd_declare_rename(const struct lu_env *env,
2379                               struct mdd_device *mdd,
2380                               struct mdd_object *mdd_spobj,
2381                               struct mdd_object *mdd_tpobj,
2382                               struct mdd_object *mdd_sobj,
2383                               struct mdd_object *mdd_tobj,
2384                               const struct lu_name *tname,
2385                               const struct lu_name *sname,
2386                               struct md_attr *ma,
2387                               struct thandle *handle)
2388 {
2389         int rc;
2390
2391         LASSERT(mdd_spobj);
2392         LASSERT(mdd_tpobj);
2393         LASSERT(mdd_sobj);
2394
2395         /* name from source dir */
2396         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2397         if (rc)
2398                 return rc;
2399
2400         /* .. from source child */
2401         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2402                 /* source child can be directory,
2403                  * counted by source dir's nlink */
2404                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2405                 if (rc)
2406                         return rc;
2407
2408                 rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
2409                 if (rc)
2410                         return rc;
2411
2412                 rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
2413                                               dotdot, handle);
2414                 if (rc)
2415                         return rc;
2416
2417                 /* new target child can be directory,
2418                  * counted by target dir's nlink */
2419                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2420                 if (rc)
2421                         return rc;
2422
2423         }
2424
2425         rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
2426         if (rc)
2427                 return rc;
2428
2429         rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
2430         if (rc)
2431                 return rc;
2432         mdd_declare_links_add(env, mdd_sobj, handle, NULL);
2433         if (rc)
2434                 return rc;
2435
2436         rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
2437         if (rc)
2438                 return rc;
2439
2440         /* new name */
2441         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2442                         tname->ln_name, handle);
2443         if (rc)
2444                 return rc;
2445
2446         /* name from target dir (old name), we declare it unconditionally
2447          * as mdd_rename() calls delete unconditionally as well. so just
2448          * to balance declarations vs calls to change ... */
2449         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2450         if (rc)
2451                 return rc;
2452
2453         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2454                 /* delete target child in target parent directory */
2455                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2456                 if (rc)
2457                         return rc;
2458
2459                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2460                         /* target child can be directory,
2461                          * delete "." reference in target child directory */
2462                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2463                         if (rc)
2464                                 return rc;
2465
2466                         /* delete ".." reference in target parent directory */
2467                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2468                         if (rc)
2469                                 return rc;
2470                 }
2471
2472                 rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
2473                 if (rc)
2474                         return rc;
2475
2476                 mdd_declare_links_del(env, mdd_tobj, handle);
2477                 if (rc)
2478                         return rc;
2479
2480                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2481                 if (rc)
2482                         return rc;
2483         }
2484
2485         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2486         if (rc)
2487                 return rc;
2488
2489         return rc;
2490 }
2491
2492 /* src object can be remote that is why we use only fid and type of object */
2493 static int mdd_rename(const struct lu_env *env,
2494                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2495                       const struct lu_fid *lf, const struct lu_name *lsname,
2496                       struct md_object *tobj, const struct lu_name *ltname,
2497                       struct md_attr *ma)
2498 {
2499         const char *sname = lsname->ln_name;
2500         const char *tname = ltname->ln_name;
2501         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2502         struct lu_attr    *so_attr = &mdd_env_info(env)->mti_cattr;
2503         struct lu_attr    *tg_attr = &mdd_env_info(env)->mti_pattr;
2504         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2505         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2506         struct mdd_device *mdd = mdo2mdd(src_pobj);
2507         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2508         struct mdd_object *mdd_tobj = NULL;
2509         struct dynlock_handle *sdlh, *tdlh;
2510         struct thandle *handle;
2511         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2512         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2513         bool is_dir;
2514         bool tobj_ref = 0;
2515         bool tobj_locked = 0;
2516         unsigned cl_flags = 0;
2517         int rc, rc2;
2518         ENTRY;
2519
2520         if (tobj)
2521                 mdd_tobj = md2mdd_obj(tobj);
2522
2523         mdd_sobj = mdd_object_find(env, mdd, lf);
2524
2525         handle = mdd_trans_create(env, mdd);
2526         if (IS_ERR(handle))
2527                 GOTO(out_pending, rc = PTR_ERR(handle));
2528
2529         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2530                                 mdd_tobj, lsname, ltname, ma, handle);
2531         if (rc)
2532                 GOTO(stop, rc);
2533
2534         rc = mdd_trans_start(env, mdd, handle);
2535         if (rc)
2536                 GOTO(stop, rc);
2537
2538         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2539         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
2540         if (rc < 0)
2541                 GOTO(cleanup_unlocked, rc);
2542
2543         /* Get locks in determined order */
2544         if (rc == MDD_RN_SAME) {
2545                 sdlh = mdd_pdo_write_lock(env, mdd_spobj,
2546                                           sname, MOR_SRC_PARENT);
2547                 /* check hashes to determine do we need one lock or two */
2548                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
2549                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
2550                                 MOR_TGT_PARENT);
2551                 else
2552                         tdlh = sdlh;
2553         } else if (rc == MDD_RN_SRCTGT) {
2554                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
2555                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
2556         } else {
2557                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
2558                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
2559         }
2560         if (sdlh == NULL || tdlh == NULL)
2561                 GOTO(cleanup, rc = -ENOMEM);
2562
2563         rc = mdd_la_get(env, mdd_sobj, so_attr,
2564                         mdd_object_capa(env, mdd_sobj));
2565         if (rc)
2566                 GOTO(cleanup, rc);
2567
2568         if (mdd_tobj) {
2569                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2570                                 mdd_object_capa(env, mdd_tobj));
2571                 if (rc)
2572                         GOTO(cleanup, rc);
2573         }
2574
2575         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj,
2576                                      mdd_tobj, so_attr, tg_attr);
2577         if (rc)
2578                 GOTO(cleanup, rc);
2579
2580         is_dir = S_ISDIR(so_attr->la_mode);
2581
2582         /* Remove source name from source directory */
2583         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2584                                 mdd_object_capa(env, mdd_spobj));
2585         if (rc)
2586                 GOTO(cleanup, rc);
2587
2588         /* "mv dir1 dir2" needs "dir1/.." link update */
2589         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2590                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2591                                         mdd_object_capa(env, mdd_sobj));
2592                 if (rc)
2593                         GOTO(fixup_spobj2, rc);
2594
2595                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2596                                       handle, mdd_object_capa(env, mdd_sobj));
2597                 if (rc)
2598                         GOTO(fixup_spobj, rc);
2599         }
2600
2601         /* Remove target name from target directory
2602          * Here tobj can be remote one, so we do index_delete unconditionally
2603          * and -ENOENT is allowed.
2604          */
2605         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2606                                 mdd_object_capa(env, mdd_tpobj));
2607         if (rc != 0) {
2608                 if (mdd_tobj) {
2609                         /* tname might been renamed to something else */
2610                         GOTO(fixup_spobj, rc);
2611                 }
2612                 if (rc != -ENOENT)
2613                         GOTO(fixup_spobj, rc);
2614         }
2615
2616         /* Insert new fid with target name into target dir */
2617         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2618                                 mdd_object_capa(env, mdd_tpobj));
2619         if (rc)
2620                 GOTO(fixup_tpobj, rc);
2621
2622         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2623         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2624
2625         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2626         if (mdd_sobj) {
2627                 la->la_valid = LA_CTIME;
2628                 rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
2629                 if (rc)
2630                         GOTO(fixup_tpobj, rc);
2631         }
2632
2633         /* Remove old target object
2634          * For tobj is remote case cmm layer has processed
2635          * and set tobj to NULL then. So when tobj is NOT NULL,
2636          * it must be local one.
2637          */
2638         if (tobj && mdd_object_exists(mdd_tobj)) {
2639                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2640                 tobj_locked = 1;
2641                 if (mdd_is_dead_obj(mdd_tobj)) {
2642                         /* shld not be dead, something is wrong */
2643                         CERROR("tobj is dead, something is wrong\n");
2644                         rc = -EINVAL;
2645                         goto cleanup;
2646                 }
2647                 mdo_ref_del(env, mdd_tobj, handle);
2648
2649                 /* Remove dot reference. */
2650                 if (S_ISDIR(tg_attr->la_mode))
2651                         mdo_ref_del(env, mdd_tobj, handle);
2652                 tobj_ref = 1;
2653
2654                 /* fetch updated nlink */
2655                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2656                                 mdd_object_capa(env, mdd_tobj));
2657                 if (rc != 0) {
2658                         CERROR("%s: Failed to get nlink for tobj "
2659                                 DFID": rc = %d\n",
2660                                 mdd2obd_dev(mdd)->obd_name,
2661                                 PFID(tpobj_fid), rc);
2662                         GOTO(fixup_tpobj, rc);
2663                 }
2664
2665                 la->la_valid = LA_CTIME;
2666                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
2667                 if (rc != 0) {
2668                         CERROR("%s: Failed to set ctime for tobj "
2669                                 DFID": rc = %d\n",
2670                                 mdd2obd_dev(mdd)->obd_name,
2671                                 PFID(tpobj_fid), rc);
2672                         GOTO(fixup_tpobj, rc);
2673                 }
2674
2675                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2676                 ma->ma_attr = *tg_attr;
2677                 ma->ma_valid |= MA_INODE;
2678                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2679                 if (rc != 0) {
2680                         CERROR("%s: Failed to unlink tobj "
2681                                 DFID": rc = %d\n",
2682                                 mdd2obd_dev(mdd)->obd_name,
2683                                 PFID(tpobj_fid), rc);
2684                         GOTO(fixup_tpobj, rc);
2685                 }
2686
2687                 /* fetch updated nlink */
2688                 rc = mdd_la_get(env, mdd_tobj, tg_attr,
2689                                 mdd_object_capa(env, mdd_tobj));
2690                 if (rc != 0) {
2691                         CERROR("%s: Failed to get nlink for tobj "
2692                                 DFID": rc = %d\n",
2693                                 mdd2obd_dev(mdd)->obd_name,
2694                                 PFID(tpobj_fid), rc);
2695                         GOTO(fixup_tpobj, rc);
2696                 }
2697                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2698                 ma->ma_attr = *tg_attr;
2699                 ma->ma_valid |= MA_INODE;
2700
2701                 if (so_attr->la_nlink == 0)
2702                         cl_flags |= CLF_RENAME_LAST;
2703         }
2704
2705         la->la_valid = LA_CTIME | LA_MTIME;
2706         rc = mdd_attr_check_set_internal(env, mdd_spobj, la, handle, 0);
2707         if (rc)
2708                 GOTO(fixup_tpobj, rc);
2709
2710         if (mdd_spobj != mdd_tpobj) {
2711                 la->la_valid = LA_CTIME | LA_MTIME;
2712                 rc = mdd_attr_check_set_internal(env, mdd_tpobj, la,
2713                                                  handle, 0);
2714         }
2715
2716         if (rc == 0 && mdd_sobj) {
2717                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2718                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2719                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2720                                       0, 0);
2721                 if (rc == -ENOENT)
2722                         /* Old files might not have EA entry */
2723                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2724                                       lsname, handle, NULL, 0);
2725                 mdd_write_unlock(env, mdd_sobj);
2726                 /* We don't fail the transaction if the link ea can't be
2727                    updated -- fid2path will use alternate lookup method. */
2728                 rc = 0;
2729         }
2730
2731         EXIT;
2732
2733 fixup_tpobj:
2734         if (rc) {
2735                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2736                                          BYPASS_CAPA);
2737                 if (rc2)
2738                         CWARN("tp obj fix error %d\n",rc2);
2739
2740                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2741                     !mdd_is_dead_obj(mdd_tobj)) {
2742                         if (tobj_ref) {
2743                                 mdo_ref_add(env, mdd_tobj, handle);
2744                                 if (is_dir)
2745                                         mdo_ref_add(env, mdd_tobj, handle);
2746                         }
2747
2748                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2749                                          mdo2fid(mdd_tobj), tname,
2750                                          is_dir, handle,
2751                                          BYPASS_CAPA);
2752
2753                         if (rc2)
2754                                 CWARN("tp obj fix error %d\n",rc2);
2755                 }
2756         }
2757
2758 fixup_spobj:
2759         if (rc && is_dir && mdd_sobj) {
2760                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2761                                               BYPASS_CAPA);
2762
2763                 if (rc2)
2764                         CWARN("sp obj dotdot delete error %d\n",rc2);
2765
2766
2767                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2768                                               dotdot, handle, BYPASS_CAPA);
2769                 if (rc2)
2770                         CWARN("sp obj dotdot insert error %d\n",rc2);
2771         }
2772
2773 fixup_spobj2:
2774         if (rc) {
2775                 rc2 = __mdd_index_insert(env, mdd_spobj,
2776                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2777                 if (rc2)
2778                         CWARN("sp obj fix error %d\n",rc2);
2779         }
2780 cleanup:
2781         if (tobj_locked)
2782                 mdd_write_unlock(env, mdd_tobj);
2783         if (likely(tdlh) && sdlh != tdlh)
2784                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
2785         if (likely(sdlh))
2786                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
2787 cleanup_unlocked:
2788         if (rc == 0)
2789                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2790                                                 mdd_tobj, tpobj_fid, lf,
2791                                                 spobj_fid, ltname, lsname,
2792                                                 handle);
2793
2794 stop:
2795         mdd_trans_stop(env, mdd, rc, handle);
2796 out_pending:
2797         mdd_object_put(env, mdd_sobj);
2798         return rc;
2799 }
2800
2801 const struct md_dir_operations mdd_dir_ops = {
2802         .mdo_is_subdir     = mdd_is_subdir,
2803         .mdo_lookup        = mdd_lookup,
2804         .mdo_create        = mdd_create,
2805         .mdo_rename        = mdd_rename,
2806         .mdo_link          = mdd_link,
2807         .mdo_unlink        = mdd_unlink,
2808         .mdo_create_data   = mdd_create_data,
2809 };