Whamcloud - gitweb
LU-4471 mdd: mdd_unlink: do trans_start after sanity check
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_dir.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@intel.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_mds.h>
48 #include <lustre_fid.h>
49
50 #include "mdd_internal.h"
51
52 static const char dot[] = ".";
53 static const char dotdot[] = "..";
54
55 static struct lu_name lname_dotdot = {
56         (char *) dotdot,
57         sizeof(dotdot) - 1
58 };
59
60 /* Get FID from name and parent */
61 static int
62 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
63              const struct lu_attr *pattr, const struct lu_name *lname,
64              struct lu_fid* fid, int mask)
65 {
66         const char *name                = lname->ln_name;
67         const struct dt_key *key        = (const struct dt_key *)name;
68         struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
69         struct mdd_device *m            = mdo2mdd(pobj);
70         struct dt_object *dir           = mdd_object_child(mdd_obj);
71         int rc;
72         ENTRY;
73
74         if (unlikely(mdd_is_dead_obj(mdd_obj)))
75                 RETURN(-ESTALE);
76
77         if (mdd_object_remote(mdd_obj)) {
78                 CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
79                        mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
80         } else if (!mdd_object_exists(mdd_obj)) {
81                 RETURN(-ESTALE);
82         }
83
84         /* The common filename length check. */
85         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
86                 RETURN(-ENAMETOOLONG);
87
88         rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
89                                             MOR_TGT_PARENT);
90         if (rc)
91                 RETURN(rc);
92
93         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
94                    dt_try_as_dir(env, dir))) {
95
96                 rc = dir->do_index_ops->dio_lookup(env, dir,
97                                                  (struct dt_rec *)fid, key,
98                                                  mdd_object_capa(env, mdd_obj));
99                 if (rc > 0)
100                         rc = 0;
101                 else if (rc == 0)
102                         rc = -ENOENT;
103         } else
104                 rc = -ENOTDIR;
105
106         RETURN(rc);
107 }
108
109 int mdd_lookup(const struct lu_env *env,
110                struct md_object *pobj, const struct lu_name *lname,
111                struct lu_fid *fid, struct md_op_spec *spec)
112 {
113         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
114         int rc;
115         ENTRY;
116
117         rc = mdd_la_get(env, md2mdd_obj(pobj), pattr, BYPASS_CAPA);
118         if (rc != 0)
119                 RETURN(rc);
120
121         rc = __mdd_lookup(env, pobj, pattr, lname, fid, MAY_EXEC);
122         RETURN(rc);
123 }
124
125 static inline int mdd_parent_fid(const struct lu_env *env,
126                                  struct mdd_object *obj,
127                                  const struct lu_attr *attr,
128                                  struct lu_fid *fid)
129 {
130         return __mdd_lookup(env, &obj->mod_obj, attr, &lname_dotdot, fid, 0);
131 }
132
133 /*
134  * For root fid use special function, which does not compare version component
135  * of fid. Version component is different for root fids on all MDTs.
136  */
137 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
138 {
139         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
140                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
141 }
142
143 /*
144  * return 1: if lf is the fid of the ancestor of p1;
145  * return 0: if not;
146  *
147  * return -EREMOTE: if remote object is found, in this
148  * case fid of remote object is saved to @pf;
149  *
150  * otherwise: values < 0, errors.
151  */
152 static int mdd_is_parent(const struct lu_env *env,
153                         struct mdd_device *mdd,
154                         struct mdd_object *p1,
155                         const struct lu_attr *attr,
156                         const struct lu_fid *lf,
157                         struct lu_fid *pf)
158 {
159         struct mdd_object *parent = NULL;
160         struct lu_fid *pfid;
161         int rc;
162         ENTRY;
163
164         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
165         pfid = &mdd_env_info(env)->mti_fid;
166
167         /* Check for root first. */
168         if (mdd_is_root(mdd, mdo2fid(p1)))
169                 RETURN(0);
170
171         for(;;) {
172                 /* this is done recursively, bypass capa for each obj */
173                 mdd_set_capainfo(env, 4, p1, BYPASS_CAPA);
174                 rc = mdd_parent_fid(env, p1, attr, pfid);
175                 if (rc)
176                         GOTO(out, rc);
177                 if (mdd_is_root(mdd, pfid))
178                         GOTO(out, rc = 0);
179                 if (lu_fid_eq(pfid, lf))
180                         GOTO(out, rc = 1);
181                 if (parent)
182                         mdd_object_put(env, parent);
183
184                 parent = mdd_object_find(env, mdd, pfid);
185                 if (IS_ERR(parent)) {
186                         GOTO(out, rc = PTR_ERR(parent));
187                 } else if (mdd_object_remote(parent)) {
188                         /*FIXME: Because of the restriction of rename in Phase I.
189                          * If the parent is remote, we just assumed lf is not the
190                          * parent of P1 for now */
191                         GOTO(out, rc = 0);
192                 }
193                 p1 = parent;
194         }
195         EXIT;
196 out:
197         if (parent && !IS_ERR(parent))
198                 mdd_object_put(env, parent);
199         return rc;
200 }
201
202 /*
203  * No permission check is needed.
204  *
205  * returns 1: if fid is ancestor of @mo;
206  * returns 0: if fid is not a ancestor of @mo;
207  *
208  * returns EREMOTE if remote object is found, fid of remote object is saved to
209  * @fid;
210  *
211  * returns < 0: if error
212  */
213 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
214                   const struct lu_fid *fid, struct lu_fid *sfid)
215 {
216         struct mdd_device *mdd = mdo2mdd(mo);
217         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
218         int rc;
219         ENTRY;
220
221         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
222                 RETURN(0);
223
224         rc = mdd_la_get(env, md2mdd_obj(mo), attr, BYPASS_CAPA);
225         if (rc != 0)
226                 RETURN(rc);
227
228         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
229         if (rc == 0) {
230                 /* found root */
231                 fid_zero(sfid);
232         } else if (rc == 1) {
233                 /* found @fid is parent */
234                 *sfid = *fid;
235                 rc = 0;
236         }
237         RETURN(rc);
238 }
239
240 /*
241  * Check that @dir contains no entries except (possibly) dot and dotdot.
242  *
243  * Returns:
244  *
245  *             0        empty
246  *      -ENOTDIR        not a directory object
247  *    -ENOTEMPTY        not empty
248  *           -ve        other error
249  *
250  */
251 static int mdd_dir_is_empty(const struct lu_env *env,
252                             struct mdd_object *dir)
253 {
254         struct dt_it     *it;
255         struct dt_object *obj;
256         const struct dt_it_ops *iops;
257         int result;
258         ENTRY;
259
260         obj = mdd_object_child(dir);
261         if (!dt_try_as_dir(env, obj))
262                 RETURN(-ENOTDIR);
263
264         iops = &obj->do_index_ops->dio_it;
265         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
266         if (!IS_ERR(it)) {
267                 result = iops->get(env, it, (const void *)"");
268                 if (result > 0) {
269                         int i;
270                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
271                                 result = iops->next(env, it);
272                         if (result == 0)
273                                 result = -ENOTEMPTY;
274                         else if (result == +1)
275                                 result = 0;
276                 } else if (result == 0)
277                         /*
278                          * Huh? Index contains no zero key?
279                          */
280                         result = -EIO;
281
282                 iops->put(env, it);
283                 iops->fini(env, it);
284         } else
285                 result = PTR_ERR(it);
286         RETURN(result);
287 }
288
289 static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj,
290                           const struct lu_attr *la)
291 {
292         struct mdd_device *m = mdd_obj2mdd_dev(obj);
293         ENTRY;
294
295         LASSERT(la != NULL);
296
297         if (!S_ISDIR(la->la_mode))
298                 RETURN(0);
299
300         /*
301          * Subdir count limitation can be broken through.
302          */
303         if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
304                 RETURN(-EMLINK);
305         else
306                 RETURN(0);
307 }
308
309 /*
310  * Check whether it may create the cobj under the pobj.
311  * cobj maybe NULL
312  */
313 int mdd_may_create(const struct lu_env *env,
314                    struct mdd_object *pobj, const struct lu_attr *pattr,
315                    struct mdd_object *cobj, int check_perm, int check_nlink)
316 {
317         int rc = 0;
318         ENTRY;
319
320         if (cobj && mdd_object_exists(cobj))
321                 RETURN(-EEXIST);
322
323         if (mdd_is_dead_obj(pobj))
324                 RETURN(-ENOENT);
325
326         if (check_perm)
327                 rc = mdd_permission_internal_locked(env, pobj, pattr,
328                                                     MAY_WRITE | MAY_EXEC,
329                                                     MOR_TGT_PARENT);
330         if (!rc && check_nlink)
331                 rc = __mdd_may_link(env, pobj, pattr);
332
333         RETURN(rc);
334 }
335
336 /*
337  * Check whether can unlink from the pobj in the case of "cobj == NULL".
338  */
339 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
340                    const struct lu_attr *pattr, const struct lu_attr *attr)
341 {
342         int rc;
343         ENTRY;
344
345         if (mdd_is_dead_obj(pobj))
346                 RETURN(-ENOENT);
347
348         if ((attr->la_valid & LA_FLAGS) &&
349             (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
350                 RETURN(-EPERM);
351
352         rc = mdd_permission_internal_locked(env, pobj, pattr,
353                                             MAY_WRITE | MAY_EXEC,
354                                             MOR_TGT_PARENT);
355         if (rc)
356                 RETURN(rc);
357
358         if (mdd_is_append(pobj))
359                 RETURN(-EPERM);
360
361         RETURN(rc);
362 }
363
364 /*
365  * pobj == NULL is remote ops case, under such case, pobj's
366  * VTX feature has been checked already, no need check again.
367  */
368 static inline int mdd_is_sticky(const struct lu_env *env,
369                                 struct mdd_object *pobj,
370                                 const struct lu_attr *pattr,
371                                 struct mdd_object *cobj,
372                                 const struct lu_attr *cattr)
373 {
374         struct lu_ucred *uc = lu_ucred_assert(env);
375
376         if (pobj != NULL) {
377                 LASSERT(pattr != NULL);
378                 if (!(pattr->la_mode & S_ISVTX) ||
379                     (pattr->la_uid == uc->uc_fsuid))
380                         return 0;
381         }
382
383         LASSERT(cattr != NULL);
384         if (cattr->la_uid == uc->uc_fsuid)
385                 return 0;
386
387         return !md_capable(uc, CFS_CAP_FOWNER);
388 }
389
390 static int mdd_may_delete_entry(const struct lu_env *env,
391                                 struct mdd_object *pobj,
392                                 const struct lu_attr *pattr,
393                                 int check_perm)
394 {
395         ENTRY;
396
397         LASSERT(pobj != NULL);
398         if (!mdd_object_exists(pobj))
399                 RETURN(-ENOENT);
400
401         if (mdd_is_dead_obj(pobj))
402                 RETURN(-ENOENT);
403
404         if (check_perm) {
405                 int rc;
406                 rc = mdd_permission_internal_locked(env, pobj, pattr,
407                                             MAY_WRITE | MAY_EXEC,
408                                             MOR_TGT_PARENT);
409                 if (rc)
410                         RETURN(rc);
411         }
412
413         if (mdd_is_append(pobj))
414                 RETURN(-EPERM);
415
416         RETURN(0);
417 }
418
419 /*
420  * Check whether it may delete the cobj from the pobj.
421  * pobj maybe NULL
422  */
423 int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
424                    const struct lu_attr *tpattr, struct mdd_object *tobj,
425                    const struct lu_attr *tattr, const struct lu_attr *cattr,
426                    int check_perm, int check_empty)
427 {
428         int rc = 0;
429         ENTRY;
430
431         if (tpobj) {
432                 LASSERT(tpattr != NULL);
433                 rc = mdd_may_delete_entry(env, tpobj, tpattr, check_perm);
434                 if (rc != 0)
435                         RETURN(rc);
436         }
437
438         if (tobj == NULL)
439                 RETURN(0);
440
441         if (!mdd_object_exists(tobj))
442                 RETURN(-ENOENT);
443
444         if (mdd_is_dead_obj(tobj))
445                 RETURN(-ESTALE);
446
447         if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
448                 RETURN(-EPERM);
449
450         if (mdd_is_immutable(tobj) || mdd_is_append(tobj))
451                 RETURN(-EPERM);
452
453         if ((tattr->la_valid & LA_FLAGS) &&
454             (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
455                 RETURN(-EPERM);
456
457         /* additional check the rename case */
458         if (cattr) {
459                 if (S_ISDIR(cattr->la_mode)) {
460                         struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
461
462                         if (!S_ISDIR(tattr->la_mode))
463                                 RETURN(-ENOTDIR);
464
465                         if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
466                                 RETURN(-EBUSY);
467                 } else if (S_ISDIR(tattr->la_mode))
468                         RETURN(-EISDIR);
469         }
470
471         if (S_ISDIR(tattr->la_mode) && check_empty)
472                 rc = mdd_dir_is_empty(env, tobj);
473
474         RETURN(rc);
475 }
476
477 /*
478  * tgt maybe NULL
479  * has mdd_write_lock on src already, but not on tgt yet
480  */
481 static int mdd_link_sanity_check(const struct lu_env *env,
482                                  struct mdd_object *tgt_obj,
483                                  const struct lu_attr *tattr,
484                                  const struct lu_name *lname,
485                                  struct mdd_object *src_obj,
486                                  const struct lu_attr *cattr)
487 {
488         struct mdd_device *m = mdd_obj2mdd_dev(src_obj);
489         int rc = 0;
490         ENTRY;
491
492         if (!mdd_object_exists(src_obj))
493                 RETURN(-ENOENT);
494
495         if (mdd_is_dead_obj(src_obj))
496                 RETURN(-ESTALE);
497
498         /* Local ops, no lookup before link, check filename length here. */
499         if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
500                 RETURN(-ENAMETOOLONG);
501
502         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
503                 RETURN(-EPERM);
504
505         if (S_ISDIR(mdd_object_type(src_obj)))
506                 RETURN(-EPERM);
507
508         LASSERT(src_obj != tgt_obj);
509         if (tgt_obj) {
510                 rc = mdd_may_create(env, tgt_obj, tattr, NULL, 1, 0);
511                 if (rc)
512                         RETURN(rc);
513         }
514
515         rc = __mdd_may_link(env, src_obj, cattr);
516
517         RETURN(rc);
518 }
519
520 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
521                                    const char *name, struct thandle *handle,
522                                    struct lustre_capa *capa)
523 {
524         struct dt_object *next = mdd_object_child(pobj);
525         int               rc;
526         ENTRY;
527
528         if (dt_try_as_dir(env, next)) {
529                 rc = next->do_index_ops->dio_delete(env, next,
530                                                     (struct dt_key *)name,
531                                                     handle, capa);
532         } else
533                 rc = -ENOTDIR;
534
535         RETURN(rc);
536 }
537
538 static int __mdd_index_insert_only(const struct lu_env *env,
539                                    struct mdd_object *pobj,
540                                    const struct lu_fid *lf, const char *name,
541                                    struct thandle *handle,
542                                    struct lustre_capa *capa)
543 {
544         struct dt_object *next = mdd_object_child(pobj);
545         int               rc;
546         ENTRY;
547
548         if (dt_try_as_dir(env, next)) {
549                 struct lu_ucred  *uc = lu_ucred_check(env);
550                 int ignore_quota;
551
552                 ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
553                 rc = next->do_index_ops->dio_insert(env, next,
554                                                     (struct dt_rec*)lf,
555                                                     (const struct dt_key *)name,
556                                                     handle, capa, ignore_quota);
557         } else {
558                 rc = -ENOTDIR;
559         }
560         RETURN(rc);
561 }
562
563 /* insert named index, add reference if isdir */
564 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
565                               const struct lu_fid *lf, const char *name, int is_dir,
566                               struct thandle *handle, struct lustre_capa *capa)
567 {
568         int               rc;
569         ENTRY;
570
571         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
572         if (rc == 0 && is_dir) {
573                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
574                 mdo_ref_add(env, pobj, handle);
575                 mdd_write_unlock(env, pobj);
576         }
577         RETURN(rc);
578 }
579
580 /* delete named index, drop reference if isdir */
581 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
582                               const char *name, int is_dir, struct thandle *handle,
583                               struct lustre_capa *capa)
584 {
585         int               rc;
586         ENTRY;
587
588         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
589         if (rc == 0 && is_dir) {
590                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
591                 mdo_ref_del(env, pobj, handle);
592                 mdd_write_unlock(env, pobj);
593         }
594
595         RETURN(rc);
596 }
597
598 int mdd_declare_changelog_store(const struct lu_env *env,
599                                 struct mdd_device *mdd,
600                                 const struct lu_name *fname,
601                                 struct thandle *handle)
602 {
603         struct obd_device               *obd = mdd2obd_dev(mdd);
604         struct llog_ctxt                *ctxt;
605         struct llog_changelog_rec       *rec;
606         struct lu_buf                   *buf;
607         int                              reclen;
608         int                              rc;
609
610         /* Not recording */
611         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
612                 return 0;
613
614         reclen = llog_data_len(sizeof(*rec) +
615                                (fname != NULL ? fname->ln_namelen : 0));
616         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
617         if (buf->lb_buf == NULL)
618                 return -ENOMEM;
619
620         rec = buf->lb_buf;
621         rec->cr_hdr.lrh_len = reclen;
622         rec->cr_hdr.lrh_type = CHANGELOG_REC;
623
624         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
625         if (ctxt == NULL)
626                 return -ENXIO;
627
628         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
629         llog_ctxt_put(ctxt);
630
631         return rc;
632 }
633
634 static int mdd_declare_changelog_ext_store(const struct lu_env *env,
635                                            struct mdd_device *mdd,
636                                            const struct lu_name *tname,
637                                            const struct lu_name *sname,
638                                            struct thandle *handle)
639 {
640         struct obd_device               *obd = mdd2obd_dev(mdd);
641         struct llog_ctxt                *ctxt;
642         struct llog_changelog_ext_rec   *rec;
643         struct lu_buf                   *buf;
644         int                              reclen;
645         int                              rc;
646
647         /* Not recording */
648         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
649                 return 0;
650
651         reclen = llog_data_len(sizeof(*rec) +
652                                (tname != NULL ? tname->ln_namelen : 0) +
653                                (sname != NULL ? 1 + sname->ln_namelen : 0));
654         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
655         if (buf->lb_buf == NULL)
656                 return -ENOMEM;
657
658         rec = buf->lb_buf;
659         rec->cr_hdr.lrh_len = reclen;
660         rec->cr_hdr.lrh_type = CHANGELOG_REC;
661
662         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
663         if (ctxt == NULL)
664                 return -ENXIO;
665
666         rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle);
667         llog_ctxt_put(ctxt);
668
669         return rc;
670 }
671
672 /** Add a changelog entry \a rec to the changelog llog
673  * \param mdd
674  * \param rec
675  * \param handle - currently ignored since llogs start their own transaction;
676  *                 this will hopefully be fixed in llog rewrite
677  * \retval 0 ok
678  */
679 int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
680                         struct llog_changelog_rec *rec, struct thandle *th)
681 {
682         struct obd_device       *obd = mdd2obd_dev(mdd);
683         struct llog_ctxt        *ctxt;
684         int                      rc;
685
686         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
687         rec->cr_hdr.lrh_type = CHANGELOG_REC;
688         rec->cr.cr_time = cl_time();
689
690         spin_lock(&mdd->mdd_cl.mc_lock);
691         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
692          * but as long as the MDD transactions are ordered correctly for e.g.
693          * rename conflicts, I don't think this should matter. */
694         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
695         spin_unlock(&mdd->mdd_cl.mc_lock);
696
697         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
698         if (ctxt == NULL)
699                 return -ENXIO;
700
701         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
702         llog_ctxt_put(ctxt);
703         if (rc > 0)
704                 rc = 0;
705         return rc;
706 }
707
708 /** Add a changelog_ext entry \a rec to the changelog llog
709  * \param mdd
710  * \param rec
711  * \param handle - currently ignored since llogs start their own transaction;
712  *              this will hopefully be fixed in llog rewrite
713  * \retval 0 ok
714  */
715 int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
716                             struct llog_changelog_ext_rec *rec,
717                             struct thandle *th)
718 {
719         struct obd_device       *obd = mdd2obd_dev(mdd);
720         struct llog_ctxt        *ctxt;
721         int                      rc;
722
723         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
724         /* llog_lvfs_write_rec sets the llog tail len */
725         rec->cr_hdr.lrh_type = CHANGELOG_REC;
726         rec->cr.cr_time = cl_time();
727
728         spin_lock(&mdd->mdd_cl.mc_lock);
729         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
730          * but as long as the MDD transactions are ordered correctly for e.g.
731          * rename conflicts, I don't think this should matter. */
732         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
733         spin_unlock(&mdd->mdd_cl.mc_lock);
734
735         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
736         if (ctxt == NULL)
737                 return -ENXIO;
738
739         /* nested journal transaction */
740         rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th);
741         llog_ctxt_put(ctxt);
742         if (rc > 0)
743                 rc = 0;
744
745         return rc;
746 }
747
748 /** Store a namespace change changelog record
749  * If this fails, we must fail the whole transaction; we don't
750  * want the change to commit without the log entry.
751  * \param target - mdd_object of change
752  * \param parent - parent dir/object
753  * \param tname - target name string
754  * \param handle - transacion handle
755  */
756 int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
757                            enum changelog_rec_type type, unsigned flags,
758                            struct mdd_object *target, struct mdd_object *parent,
759                            const struct lu_name *tname, struct thandle *handle)
760 {
761         struct llog_changelog_rec *rec;
762         struct lu_buf *buf;
763         int reclen;
764         int rc;
765         ENTRY;
766
767         /* Not recording */
768         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
769                 RETURN(0);
770         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
771                 RETURN(0);
772
773         LASSERT(target != NULL);
774         LASSERT(parent != NULL);
775         LASSERT(tname != NULL);
776         LASSERT(handle != NULL);
777
778         reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
779         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
780         if (buf->lb_buf == NULL)
781                 RETURN(-ENOMEM);
782         rec = buf->lb_buf;
783
784         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
785         rec->cr.cr_type = (__u32)type;
786         rec->cr.cr_tfid = *mdo2fid(target);
787         rec->cr.cr_pfid = *mdo2fid(parent);
788         rec->cr.cr_namelen = tname->ln_namelen;
789         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
790
791         target->mod_cltime = cfs_time_current_64();
792
793         rc = mdd_changelog_store(env, mdd, rec, handle);
794         if (rc < 0) {
795                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
796                         rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid),
797                         PFID(&rec->cr.cr_pfid));
798                 RETURN(-EFAULT);
799         }
800
801         RETURN(0);
802 }
803
804
805 /** Store a namespace change changelog record
806  * If this fails, we must fail the whole transaction; we don't
807  * want the change to commit without the log entry.
808  * \param target - mdd_object of change
809  * \param tpfid - target parent dir/object fid
810  * \param sfid - source object fid
811  * \param spfid - source parent fid
812  * \param tname - target name string
813  * \param sname - source name string
814  * \param handle - transacion handle
815  */
816 static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
817                                       struct mdd_device    *mdd,
818                                       enum changelog_rec_type type,
819                                       unsigned flags,
820                                       struct mdd_object    *target,
821                                       const struct lu_fid  *tpfid,
822                                       const struct lu_fid  *sfid,
823                                       const struct lu_fid  *spfid,
824                                       const struct lu_name *tname,
825                                       const struct lu_name *sname,
826                                       struct thandle *handle)
827 {
828         struct llog_changelog_ext_rec *rec;
829         struct lu_buf *buf;
830         int reclen;
831         int rc;
832         ENTRY;
833
834         /* Not recording */
835         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
836                 RETURN(0);
837         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
838                 RETURN(0);
839
840         LASSERT(sfid != NULL);
841         LASSERT(tpfid != NULL);
842         LASSERT(tname != NULL);
843         LASSERT(handle != NULL);
844
845         reclen = llog_data_len(sizeof(*rec) +
846                                sname != NULL ? 1 + sname->ln_namelen : 0);
847         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
848         if (buf->lb_buf == NULL)
849                 RETURN(-ENOMEM);
850         rec = buf->lb_buf;
851
852         rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags);
853         rec->cr.cr_type = (__u32)type;
854         rec->cr.cr_pfid = *tpfid;
855         rec->cr.cr_sfid = *sfid;
856         rec->cr.cr_spfid = *spfid;
857         rec->cr.cr_namelen = tname->ln_namelen;
858         memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
859         if (sname) {
860                 rec->cr.cr_name[tname->ln_namelen] = '\0';
861                 memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
862                         sname->ln_namelen);
863                 rec->cr.cr_namelen += 1 + sname->ln_namelen;
864         }
865
866         if (likely(target != NULL)) {
867                 rec->cr.cr_tfid = *mdo2fid(target);
868                 target->mod_cltime = cfs_time_current_64();
869         } else {
870                 fid_zero(&rec->cr.cr_tfid);
871         }
872
873         rc = mdd_changelog_ext_store(env, mdd, rec, handle);
874         if (rc < 0) {
875                 CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
876                         rc, type, tname->ln_name, PFID(sfid), PFID(tpfid));
877                 return -EFAULT;
878         }
879
880         return 0;
881 }
882
883 static int __mdd_links_add(const struct lu_env *env,
884                            struct mdd_object *mdd_obj,
885                            struct linkea_data *ldata,
886                            const struct lu_name *lname,
887                            const struct lu_fid *pfid,
888                            int first, int check)
889 {
890         int rc;
891
892         if (ldata->ld_leh == NULL) {
893                 rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
894                 if (rc) {
895                         if (rc != -ENODATA)
896                                 return rc;
897                         rc = linkea_data_new(ldata,
898                                              &mdd_env_info(env)->mti_link_buf);
899                         if (rc)
900                                 return rc;
901                 }
902         }
903
904         if (check) {
905                 rc = linkea_links_find(ldata, lname, pfid);
906                 if (rc && rc != -ENOENT)
907                         return rc;
908                 if (rc == 0)
909                         return -EEXIST;
910         }
911
912         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
913                 struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
914
915                 *tfid = *pfid;
916                 tfid->f_ver = ~0;
917                 linkea_add_buf(ldata, lname, tfid);
918         }
919
920         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE2))
921                 linkea_add_buf(ldata, lname, pfid);
922
923         return linkea_add_buf(ldata, lname, pfid);
924 }
925
926 static int __mdd_links_del(const struct lu_env *env,
927                            struct mdd_object *mdd_obj,
928                            struct linkea_data *ldata,
929                            const struct lu_name *lname,
930                            const struct lu_fid *pfid)
931 {
932         int rc;
933
934         if (ldata->ld_leh == NULL) {
935                 rc = mdd_links_read(env, mdd_obj, ldata);
936                 if (rc)
937                         return rc;
938         }
939
940         rc = linkea_links_find(ldata, lname, pfid);
941         if (rc)
942                 return rc;
943
944         linkea_del_buf(ldata, lname);
945         return 0;
946 }
947
948 static int mdd_linkea_prepare(const struct lu_env *env,
949                               struct mdd_object *mdd_obj,
950                               const struct lu_fid *oldpfid,
951                               const struct lu_name *oldlname,
952                               const struct lu_fid *newpfid,
953                               const struct lu_name *newlname,
954                               int first, int check,
955                               struct linkea_data *ldata)
956 {
957         int rc = 0;
958         int rc2 = 0;
959         ENTRY;
960
961         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
962                 return 0;
963
964         LASSERT(oldpfid != NULL || newpfid != NULL);
965
966         if (mdd_obj->mod_flags & DEAD_OBJ) {
967                 /* Prevent linkea to be updated which is NOT necessary. */
968                 ldata->ld_reclen = 0;
969                 /* No more links, don't bother */
970                 RETURN(0);
971         }
972
973         if (oldpfid != NULL) {
974                 rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
975                 if (rc) {
976                         if ((check == 1) ||
977                             (rc != -ENODATA && rc != -ENOENT))
978                                 RETURN(rc);
979                         /* No changes done. */
980                         rc = 0;
981                 }
982         }
983
984         /* If renaming, add the new record */
985         if (newpfid != NULL) {
986                 /* even if the add fails, we still delete the out-of-date
987                  * old link */
988                 rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
989                                       first, check);
990         }
991
992         rc = rc != 0 ? rc : rc2;
993
994         RETURN(rc);
995 }
996
997 int mdd_links_rename(const struct lu_env *env,
998                      struct mdd_object *mdd_obj,
999                      const struct lu_fid *oldpfid,
1000                      const struct lu_name *oldlname,
1001                      const struct lu_fid *newpfid,
1002                      const struct lu_name *newlname,
1003                      struct thandle *handle,
1004                      struct linkea_data *ldata,
1005                      int first, int check)
1006 {
1007         int rc2 = 0;
1008         int rc = 0;
1009         ENTRY;
1010
1011         if (ldata == NULL) {
1012                 ldata = &mdd_env_info(env)->mti_link_data;
1013                 memset(ldata, 0, sizeof(*ldata));
1014                 rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
1015                                         newpfid, newlname, first, check,
1016                                         ldata);
1017                 if (rc != 0)
1018                         GOTO(out, rc);
1019         }
1020
1021         if (ldata->ld_reclen != 0)
1022                 rc = mdd_links_write(env, mdd_obj, ldata, handle);
1023         EXIT;
1024 out:
1025         if (rc == 0)
1026                 rc = rc2;
1027         if (rc) {
1028                 int error = 1;
1029                 if (rc == -EOVERFLOW || rc == -ENOSPC)
1030                         error = 0;
1031                 if (oldpfid == NULL)
1032                         CDEBUG(error ? D_ERROR : D_OTHER,
1033                                "link_ea add '%.*s' failed %d "DFID"\n",
1034                                newlname->ln_namelen, newlname->ln_name,
1035                                rc, PFID(mdd_object_fid(mdd_obj)));
1036                 else if (newpfid == NULL)
1037                         CDEBUG(error ? D_ERROR : D_OTHER,
1038                                "link_ea del '%.*s' failed %d "DFID"\n",
1039                                oldlname->ln_namelen, oldlname->ln_name,
1040                                rc, PFID(mdd_object_fid(mdd_obj)));
1041                 else
1042                         CDEBUG(error ? D_ERROR : D_OTHER,
1043                                "link_ea rename '%.*s'->'%.*s' failed %d "
1044                                DFID"\n",
1045                                oldlname->ln_namelen, oldlname->ln_name,
1046                                newlname->ln_namelen, newlname->ln_name,
1047                                rc, PFID(mdd_object_fid(mdd_obj)));
1048         }
1049
1050         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1051                 /* if we vmalloced a large buffer drop it */
1052                 lu_buf_free(ldata->ld_buf);
1053
1054         return rc;
1055 }
1056
1057 static inline int mdd_links_add(const struct lu_env *env,
1058                                 struct mdd_object *mdd_obj,
1059                                 const struct lu_fid *pfid,
1060                                 const struct lu_name *lname,
1061                                 struct thandle *handle,
1062                                 struct linkea_data *data, int first)
1063 {
1064         return mdd_links_rename(env, mdd_obj, NULL, NULL,
1065                                 pfid, lname, handle, data, first, 0);
1066 }
1067
1068 static inline int mdd_links_del(const struct lu_env *env,
1069                                 struct mdd_object *mdd_obj,
1070                                 const struct lu_fid *pfid,
1071                                 const struct lu_name *lname,
1072                                 struct thandle *handle)
1073 {
1074         return mdd_links_rename(env, mdd_obj, pfid, lname,
1075                                 NULL, NULL, handle, NULL, 0, 0);
1076 }
1077
1078 /** Read the link EA into a temp buffer.
1079  * Uses the mdd_thread_info::mti_big_buf since it is generally large.
1080  * A pointer to the buffer is stored in \a ldata::ld_buf.
1081  *
1082  * \retval 0 or error
1083  */
1084 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
1085                    struct linkea_data *ldata)
1086 {
1087         int rc;
1088
1089         /* First try a small buf */
1090         LASSERT(env != NULL);
1091         ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
1092                                                PAGE_CACHE_SIZE);
1093         if (ldata->ld_buf->lb_buf == NULL)
1094                 return -ENOMEM;
1095
1096         if (!mdd_object_exists(mdd_obj))
1097                 return -ENODATA;
1098
1099         rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
1100                           BYPASS_CAPA);
1101         if (rc == -ERANGE) {
1102                 /* Buf was too small, figure out what we need. */
1103                 lu_buf_free(ldata->ld_buf);
1104                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1105                                    XATTR_NAME_LINK, BYPASS_CAPA);
1106                 if (rc < 0)
1107                         return rc;
1108                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
1109                 if (ldata->ld_buf->lb_buf == NULL)
1110                         return -ENOMEM;
1111                 rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
1112                                   XATTR_NAME_LINK, BYPASS_CAPA);
1113         }
1114         if (rc < 0)
1115                 return rc;
1116
1117         return linkea_init(ldata);
1118 }
1119
1120 /** Read the link EA into a temp buffer.
1121  * Uses the name_buf since it is generally large.
1122  * \retval IS_ERR err
1123  * \retval ptr to \a lu_buf (always \a mti_big_buf)
1124  */
1125 struct lu_buf *mdd_links_get(const struct lu_env *env,
1126                              struct mdd_object *mdd_obj)
1127 {
1128         struct linkea_data ldata = { 0 };
1129         int rc;
1130
1131         rc = mdd_links_read(env, mdd_obj, &ldata);
1132         return rc ? ERR_PTR(rc) : ldata.ld_buf;
1133 }
1134
1135 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
1136                     struct linkea_data *ldata, struct thandle *handle)
1137 {
1138         const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
1139                                                      ldata->ld_leh->leh_len);
1140         return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
1141                              mdd_object_capa(env, mdd_obj));
1142 }
1143
1144 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
1145                           struct thandle *handle, struct linkea_data *ldata)
1146 {
1147         int     rc;
1148         int     ea_len;
1149         void    *linkea;
1150
1151         if (ldata != NULL && ldata->ld_lee != NULL) {
1152                 ea_len = ldata->ld_leh->leh_len;
1153                 linkea = ldata->ld_buf->lb_buf;
1154         } else {
1155                 ea_len = DEFAULT_LINKEA_SIZE;
1156                 linkea = NULL;
1157         }
1158
1159         /* XXX: max size? */
1160         rc = mdo_declare_xattr_set(env, mdd_obj,
1161                                    mdd_buf_get_const(env, linkea, ea_len),
1162                                    XATTR_NAME_LINK, 0, handle);
1163         return rc;
1164 }
1165
1166 static inline int mdd_declare_links_del(const struct lu_env *env,
1167                                         struct mdd_object *c,
1168                                         struct thandle *handle)
1169 {
1170         int rc = 0;
1171
1172         /* For directory, the linkEA will be removed together
1173          * with the object. */
1174         if (!S_ISDIR(mdd_object_type(c)))
1175                 rc = mdd_declare_links_add(env, c, handle, NULL);
1176
1177         return rc;
1178 }
1179
1180 static int mdd_declare_link(const struct lu_env *env,
1181                             struct mdd_device *mdd,
1182                             struct mdd_object *p,
1183                             struct mdd_object *c,
1184                             const struct lu_name *name,
1185                             struct thandle *handle,
1186                             struct lu_attr *la,
1187                             struct linkea_data *data)
1188 {
1189         int rc;
1190
1191         rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
1192         if (rc)
1193                 return rc;
1194
1195         rc = mdo_declare_ref_add(env, c, handle);
1196         if (rc)
1197                 return rc;
1198
1199         la->la_valid = LA_CTIME | LA_MTIME;
1200         rc = mdo_declare_attr_set(env, p, la, handle);
1201         if (rc != 0)
1202                 return rc;
1203
1204         la->la_valid = LA_CTIME;
1205         rc = mdo_declare_attr_set(env, c, la, handle);
1206         if (rc)
1207                 return rc;
1208
1209         rc = mdd_declare_links_add(env, c, handle, data);
1210         if (rc)
1211                 return rc;
1212
1213         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1214
1215         return rc;
1216 }
1217
1218 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
1219                     struct md_object *src_obj, const struct lu_name *lname,
1220                     struct md_attr *ma)
1221 {
1222         const char *name = lname->ln_name;
1223         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1224         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
1225         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
1226         struct lu_attr    *cattr = MDD_ENV_VAR(env, cattr);
1227         struct lu_attr    *tattr = MDD_ENV_VAR(env, tattr);
1228         struct mdd_device *mdd = mdo2mdd(src_obj);
1229         struct thandle *handle;
1230         struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
1231         int rc;
1232         ENTRY;
1233
1234         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
1235         if (rc != 0)
1236                 RETURN(rc);
1237
1238         rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
1239         if (rc != 0)
1240                 RETURN(rc);
1241
1242         handle = mdd_trans_create(env, mdd);
1243         if (IS_ERR(handle))
1244                 GOTO(out_pending, rc = PTR_ERR(handle));
1245
1246         memset(ldata, 0, sizeof(*ldata));
1247
1248         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1249         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1250
1251         rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
1252                               la, ldata);
1253         if (rc)
1254                 GOTO(stop, rc);
1255
1256         rc = mdd_trans_start(env, mdd, handle);
1257         if (rc)
1258                 GOTO(stop, rc);
1259
1260         mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
1261         rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
1262                                    cattr);
1263         if (rc)
1264                 GOTO(out_unlock, rc);
1265
1266         rc = mdo_ref_add(env, mdd_sobj, handle);
1267         if (rc)
1268                 GOTO(out_unlock, rc);
1269
1270
1271         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
1272                                      name, handle,
1273                                      mdd_object_capa(env, mdd_tobj));
1274         if (rc != 0) {
1275                 mdo_ref_del(env, mdd_sobj, handle);
1276                 GOTO(out_unlock, rc);
1277         }
1278
1279         la->la_valid = LA_CTIME | LA_MTIME;
1280         rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
1281         if (rc)
1282                 GOTO(out_unlock, rc);
1283
1284         la->la_valid = LA_CTIME;
1285         rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
1286         if (rc == 0) {
1287                 rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
1288                                         mdo2fid(mdd_tobj), lname, 0, 0,
1289                                         ldata);
1290                 if (rc == 0)
1291                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
1292                                       lname, handle, ldata, 0);
1293                 /* The failure of links_add should not cause the link
1294                  * failure, reset rc here */
1295                 rc = 0;
1296         }
1297         EXIT;
1298 out_unlock:
1299         mdd_write_unlock(env, mdd_sobj);
1300         if (rc == 0)
1301                 rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
1302                                             mdd_tobj, lname, handle);
1303 stop:
1304         mdd_trans_stop(env, mdd, rc, handle);
1305
1306         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
1307                 /* if we vmalloced a large buffer drop it */
1308                 lu_buf_free(ldata->ld_buf);
1309 out_pending:
1310         return rc;
1311 }
1312
1313 int mdd_declare_finish_unlink(const struct lu_env *env,
1314                               struct mdd_object *obj,
1315                               struct md_attr *ma,
1316                               struct thandle *handle)
1317 {
1318         int     rc;
1319
1320         rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
1321         if (rc)
1322                 return rc;
1323
1324         return mdo_declare_destroy(env, obj, handle);
1325 }
1326
1327 /* caller should take a lock before calling */
1328 int mdd_finish_unlink(const struct lu_env *env,
1329                       struct mdd_object *obj, struct md_attr *ma,
1330                       struct thandle *th)
1331 {
1332         int rc = 0;
1333         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
1334         ENTRY;
1335
1336         LASSERT(mdd_write_locked(env, obj) != 0);
1337
1338         if (ma->ma_attr.la_nlink == 0 || is_dir) {
1339                 obj->mod_flags |= DEAD_OBJ;
1340
1341                 /* add new orphan and the object
1342                  * will be deleted during mdd_close() */
1343                 if (obj->mod_count) {
1344                         rc = __mdd_orphan_add(env, obj, th);
1345                         if (rc == 0)
1346                                 CDEBUG(D_HA, "Object "DFID" is inserted into "
1347                                         "orphan list, open count = %d\n",
1348                                         PFID(mdd_object_fid(obj)),
1349                                         obj->mod_count);
1350                         else
1351                                 CERROR("Object "DFID" fail to be an orphan, "
1352                                        "open count = %d, maybe cause failed "
1353                                        "open replay\n",
1354                                         PFID(mdd_object_fid(obj)),
1355                                         obj->mod_count);
1356                 } else {
1357                         rc = mdo_destroy(env, obj, th);
1358                 }
1359         }
1360
1361         RETURN(rc);
1362 }
1363
1364 /*
1365  * pobj maybe NULL
1366  * has mdd_write_lock on cobj already, but not on pobj yet
1367  */
1368 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
1369                             const struct lu_attr *pattr,
1370                             struct mdd_object *cobj,
1371                             const struct lu_attr *cattr)
1372 {
1373         int rc;
1374         ENTRY;
1375
1376         rc = mdd_may_delete(env, pobj, pattr, cobj, cattr, NULL, 1, 1);
1377
1378         RETURN(rc);
1379 }
1380
1381 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
1382                               struct mdd_object *p, struct mdd_object *c,
1383                               const struct lu_name *name, struct md_attr *ma,
1384                               struct thandle *handle, int no_name)
1385 {
1386         struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
1387         int rc;
1388
1389         if (likely(no_name == 0)) {
1390                 rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
1391                 if (rc)
1392                         return rc;
1393         }
1394
1395         rc = mdo_declare_ref_del(env, p, handle);
1396         if (rc)
1397                 return rc;
1398
1399         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1400         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1401         la->la_valid = LA_CTIME | LA_MTIME;
1402         rc = mdo_declare_attr_set(env, p, la, handle);
1403         if (rc)
1404                 return rc;
1405
1406         if (c != NULL) {
1407                 rc = mdo_declare_ref_del(env, c, handle);
1408                 if (rc)
1409                         return rc;
1410
1411                 rc = mdo_declare_ref_del(env, c, handle);
1412                 if (rc)
1413                         return rc;
1414
1415                 la->la_valid = LA_CTIME;
1416                 rc = mdo_declare_attr_set(env, c, la, handle);
1417                 if (rc)
1418                         return rc;
1419
1420                 rc = mdd_declare_finish_unlink(env, c, ma, handle);
1421                 if (rc)
1422                         return rc;
1423
1424                 rc = mdd_declare_links_del(env, c, handle);
1425                 if (rc != 0)
1426                         return rc;
1427
1428                 /* FIXME: need changelog for remove entry */
1429                 rc = mdd_declare_changelog_store(env, mdd, name, handle);
1430         }
1431
1432         return rc;
1433 }
1434
1435 /*
1436  * test if a file has an HSM archive
1437  * if HSM attributes are not found in ma update them from
1438  * HSM xattr
1439  */
1440 static bool mdd_hsm_archive_exists(const struct lu_env *env,
1441                                    struct mdd_object *obj,
1442                                    struct md_attr *ma)
1443 {
1444         ENTRY;
1445
1446         if (!(ma->ma_valid & MA_HSM)) {
1447                 /* no HSM MD provided, read xattr */
1448                 struct lu_buf   *hsm_buf;
1449                 const size_t     buflen = sizeof(struct hsm_attrs);
1450                 int              rc;
1451
1452                 hsm_buf = mdd_buf_get(env, NULL, 0);
1453                 lu_buf_alloc(hsm_buf, buflen);
1454                 rc = mdo_xattr_get(env, obj, hsm_buf, XATTR_NAME_HSM,
1455                                    mdd_object_capa(env, obj));
1456                 rc = lustre_buf2hsm(hsm_buf->lb_buf, rc, &ma->ma_hsm);
1457                 lu_buf_free(hsm_buf);
1458                 if (rc < 0)
1459                         RETURN(false);
1460
1461                 ma->ma_valid = MA_HSM;
1462         }
1463         if (ma->ma_hsm.mh_flags & HS_EXISTS)
1464                 RETURN(true);
1465         RETURN(false);
1466 }
1467
1468 /**
1469  * Delete name entry and the object.
1470  * Note: no_name == 1 means it only destory the object, i.e. name_entry
1471  * does not exist for this object, and it could only happen during resending
1472  * of remote unlink. see the comments in mdt_reint_unlink. Unfortunately, lname
1473  * is also needed in this case(needed by changelog), so we have to add another
1474  * parameter(no_name)here. XXX: this is only needed in DNE phase I, on Phase II,
1475  * the ENOENT failure should be able to be fixed by redo mechanism.
1476  */
1477 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
1478                       struct md_object *cobj, const struct lu_name *lname,
1479                       struct md_attr *ma, int no_name)
1480 {
1481         const char *name = lname->ln_name;
1482         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
1483         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
1484         struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1485         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1486         struct mdd_object *mdd_cobj = NULL;
1487         struct mdd_device *mdd = mdo2mdd(pobj);
1488         struct thandle    *handle;
1489         int rc, is_dir = 0;
1490         ENTRY;
1491
1492         /* cobj == NULL means only delete name entry */
1493         if (likely(cobj != NULL)) {
1494                 mdd_cobj = md2mdd_obj(cobj);
1495                 if (mdd_object_exists(mdd_cobj) == 0)
1496                         RETURN(-ENOENT);
1497         }
1498
1499         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
1500         if (rc)
1501                 RETURN(rc);
1502
1503         if (likely(mdd_cobj != NULL)) {
1504                 /* fetch cattr */
1505                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1506                 if (rc)
1507                         RETURN(rc);
1508
1509                 is_dir = S_ISDIR(cattr->la_mode);
1510         }
1511
1512         rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
1513         if (rc)
1514                 RETURN(rc);
1515
1516         handle = mdd_trans_create(env, mdd);
1517         if (IS_ERR(handle))
1518                 RETURN(PTR_ERR(handle));
1519
1520         rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
1521                                 lname, ma, handle, no_name);
1522         if (rc)
1523                 GOTO(stop, rc);
1524
1525         rc = mdd_trans_start(env, mdd, handle);
1526         if (rc)
1527                 GOTO(stop, rc);
1528
1529         if (likely(mdd_cobj != NULL))
1530                 mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
1531
1532         if (likely(no_name == 0)) {
1533                 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
1534                                         mdd_object_capa(env, mdd_pobj));
1535                 if (rc)
1536                         GOTO(cleanup, rc);
1537         }
1538
1539         if (likely(mdd_cobj != NULL)) {
1540                 rc = mdo_ref_del(env, mdd_cobj, handle);
1541                 if (rc != 0) {
1542                         __mdd_index_insert_only(env, mdd_pobj,
1543                                                 mdo2fid(mdd_cobj),
1544                                                 name, handle,
1545                                                 mdd_object_capa(env, mdd_pobj));
1546                         GOTO(cleanup, rc);
1547                 }
1548
1549                 if (is_dir)
1550                         /* unlink dot */
1551                         mdo_ref_del(env, mdd_cobj, handle);
1552
1553                 /* fetch updated nlink */
1554                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1555                 if (rc)
1556                         GOTO(cleanup, rc);
1557         }
1558
1559         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1560         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
1561
1562         la->la_valid = LA_CTIME | LA_MTIME;
1563         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
1564         if (rc)
1565                 GOTO(cleanup, rc);
1566
1567         /* Enough for only unlink the entry */
1568         if (unlikely(mdd_cobj == NULL))
1569                 GOTO(stop, rc);
1570
1571         if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
1572                 /* update ctime of an unlinked file only if it is still
1573                  * opened or a link still exists */
1574                 la->la_valid = LA_CTIME;
1575                 rc = mdd_update_time(env, mdd_cobj, cattr, la, handle);
1576                 if (rc)
1577                         GOTO(cleanup, rc);
1578         }
1579
1580         /* XXX: this transfer to ma will be removed with LOD/OSP */
1581         ma->ma_attr = *cattr;
1582         ma->ma_valid |= MA_INODE;
1583         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
1584
1585         /* fetch updated nlink */
1586         if (rc == 0)
1587                 rc = mdd_la_get(env, mdd_cobj, cattr, BYPASS_CAPA);
1588
1589         if (!is_dir)
1590                 /* old files may not have link ea; ignore errors */
1591                 mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle);
1592
1593         /* if object is removed then we can't get its attrs, use last get */
1594         if (cattr->la_nlink == 0) {
1595                 ma->ma_attr = *cattr;
1596                 ma->ma_valid |= MA_INODE;
1597         }
1598         EXIT;
1599 cleanup:
1600         mdd_write_unlock(env, mdd_cobj);
1601         if (rc == 0) {
1602                 int cl_flags = 0;
1603
1604                 if (cattr->la_nlink == 0) {
1605                         cl_flags |= CLF_UNLINK_LAST;
1606                         /* search for an existing archive */
1607                         if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
1608                                 cl_flags |= CLF_UNLINK_HSM_EXISTS;
1609                 }
1610
1611                 rc = mdd_changelog_ns_store(env, mdd,
1612                         is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
1613                         mdd_cobj, mdd_pobj, lname, handle);
1614         }
1615
1616 stop:
1617         mdd_trans_stop(env, mdd, rc, handle);
1618
1619         return rc;
1620 }
1621
1622 /*
1623  * The permission has been checked when obj created, no need check again.
1624  */
1625 static int mdd_cd_sanity_check(const struct lu_env *env,
1626                                struct mdd_object *obj)
1627 {
1628         ENTRY;
1629
1630         /* EEXIST check */
1631         if (!obj || mdd_is_dead_obj(obj))
1632                 RETURN(-ENOENT);
1633
1634         RETURN(0);
1635 }
1636
1637 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
1638                            struct md_object *cobj, const struct md_op_spec *spec,
1639                            struct md_attr *ma)
1640 {
1641         struct mdd_device *mdd = mdo2mdd(cobj);
1642         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1643         struct mdd_object *son = md2mdd_obj(cobj);
1644         struct thandle    *handle;
1645         const struct lu_buf *buf;
1646         struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
1647         int                rc;
1648         ENTRY;
1649
1650         rc = mdd_cd_sanity_check(env, son);
1651         if (rc)
1652                 RETURN(rc);
1653
1654         if (!md_should_create(spec->sp_cr_flags))
1655                 RETURN(0);
1656
1657         /*
1658          * there are following use cases for this function:
1659          * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
1660          *    striping can be specified or not
1661          * 2) CMD?
1662          */
1663         rc = mdd_la_get(env, son, attr, BYPASS_CAPA);
1664         if (rc)
1665                 RETURN(rc);
1666
1667         /* calling ->ah_make_hint() is used to transfer information from parent */
1668         mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
1669
1670         handle = mdd_trans_create(env, mdd);
1671         if (IS_ERR(handle))
1672                 GOTO(out_free, rc = PTR_ERR(handle));
1673
1674         /*
1675          * XXX: Setting the lov ea is not locked but setting the attr is locked?
1676          * Should this be fixed?
1677          */
1678         CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
1679                spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
1680                spec->sp_cr_flags, spec->no_create);
1681
1682         if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
1683                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1684                                         spec->u.sp_ea.eadatalen);
1685         } else {
1686                 buf = &LU_BUF_NULL;
1687         }
1688
1689         rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
1690                                   XATTR_NAME_LOV, 0, handle);
1691         if (rc)
1692                 GOTO(stop, rc);
1693
1694         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1695         if (rc)
1696                 GOTO(stop, rc);
1697
1698         rc = mdd_trans_start(env, mdd, handle);
1699         if (rc)
1700                 GOTO(stop, rc);
1701
1702         rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
1703                           0, handle, mdd_object_capa(env, son));
1704
1705         if (rc)
1706                 GOTO(stop, rc);
1707
1708         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
1709
1710 stop:
1711         mdd_trans_stop(env, mdd, rc, handle);
1712 out_free:
1713         RETURN(rc);
1714 }
1715
1716 static int mdd_declare_object_initialize(const struct lu_env *env,
1717                                          struct mdd_object *parent,
1718                                          struct mdd_object *child,
1719                                          struct lu_attr *attr,
1720                                          struct thandle *handle,
1721                                          struct linkea_data *ldata)
1722 {
1723         int rc;
1724         ENTRY;
1725
1726         /*
1727          * inode mode has been set in creation time, and it's based on umask,
1728          * la_mode and acl, don't set here again! (which will go wrong
1729          * because below function doesn't consider umask).
1730          * I'd suggest set all object attributes in creation time, see above.
1731          */
1732         LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
1733         attr->la_valid &= ~(LA_MODE | LA_TYPE);
1734         rc = mdo_declare_attr_set(env, child, attr, handle);
1735         attr->la_valid |= LA_MODE | LA_TYPE;
1736         if (rc == 0 && S_ISDIR(attr->la_mode)) {
1737                 rc = mdo_declare_index_insert(env, child, mdo2fid(child),
1738                                               dot, handle);
1739                 if (rc == 0)
1740                         rc = mdo_declare_ref_add(env, child, handle);
1741
1742                 rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
1743                                               dotdot, handle);
1744         }
1745
1746         if (rc == 0)
1747                 mdd_declare_links_add(env, child, handle, ldata);
1748
1749         RETURN(rc);
1750 }
1751
1752 static int mdd_object_initialize(const struct lu_env *env,
1753                                  const struct lu_fid *pfid,
1754                                  const struct lu_name *lname,
1755                                  struct mdd_object *child,
1756                                  struct lu_attr *attr, struct thandle *handle,
1757                                  const struct md_op_spec *spec,
1758                                  struct linkea_data *ldata)
1759 {
1760         int rc;
1761         ENTRY;
1762
1763         /*
1764          * Update attributes for child.
1765          *
1766          * FIXME:
1767          *  (1) the valid bits should be converted between Lustre and Linux;
1768          *  (2) maybe, the child attributes should be set in OSD when creation.
1769          */
1770
1771         rc = mdd_attr_set_internal(env, child, attr, handle, 0);
1772         /* arguments are supposed to stay the same */
1773         if (S_ISDIR(attr->la_mode)) {
1774                 /* Add "." and ".." for newly created dir */
1775                 mdo_ref_add(env, child, handle);
1776                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
1777                                              dot, handle, BYPASS_CAPA);
1778                 if (rc == 0)
1779                         rc = __mdd_index_insert_only(env, child, pfid,
1780                                                      dotdot, handle,
1781                                                      BYPASS_CAPA);
1782                 if (rc != 0)
1783                         mdo_ref_del(env, child, handle);
1784         }
1785
1786         if (rc == 0)
1787                 mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
1788
1789         RETURN(rc);
1790 }
1791
1792 /* has not lock on pobj yet */
1793 static int mdd_create_sanity_check(const struct lu_env *env,
1794                                    struct md_object *pobj,
1795                                    const struct lu_attr *pattr,
1796                                    const struct lu_name *lname,
1797                                    struct lu_attr *cattr,
1798                                    struct md_op_spec *spec)
1799 {
1800         struct mdd_thread_info *info = mdd_env_info(env);
1801         struct lu_fid     *fid       = &info->mti_fid;
1802         struct mdd_object *obj       = md2mdd_obj(pobj);
1803         struct mdd_device *m         = mdo2mdd(pobj);
1804         int rc;
1805         ENTRY;
1806
1807         /* EEXIST check */
1808         if (mdd_is_dead_obj(obj))
1809                 RETURN(-ENOENT);
1810
1811         /*
1812          * In some cases this lookup is not needed - we know before if name
1813          * exists or not because MDT performs lookup for it.
1814          * name length check is done in lookup.
1815          */
1816         if (spec->sp_cr_lookup) {
1817                 /*
1818                  * Check if the name already exist, though it will be checked in
1819                  * _index_insert also, for avoiding rolling back if exists
1820                  * _index_insert.
1821                  */
1822                 rc = __mdd_lookup(env, pobj, pattr, lname, fid,
1823                                   MAY_WRITE | MAY_EXEC);
1824                 if (rc != -ENOENT)
1825                         RETURN(rc ? : -EEXIST);
1826         } else {
1827                 /*
1828                  * Check WRITE permission for the parent.
1829                  * EXEC permission have been checked
1830                  * when lookup before create already.
1831                  */
1832                 rc = mdd_permission_internal_locked(env, obj, pattr, MAY_WRITE,
1833                                                     MOR_TGT_PARENT);
1834                 if (rc)
1835                         RETURN(rc);
1836         }
1837
1838         /* sgid check */
1839         if (pattr->la_mode & S_ISGID) {
1840                 cattr->la_gid = pattr->la_gid;
1841                 if (S_ISDIR(cattr->la_mode)) {
1842                         cattr->la_mode |= S_ISGID;
1843                         cattr->la_valid |= LA_MODE;
1844                 }
1845         }
1846
1847         switch (cattr->la_mode & S_IFMT) {
1848         case S_IFLNK: {
1849                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
1850
1851                 if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
1852                         RETURN(-ENAMETOOLONG);
1853                 else
1854                         RETURN(0);
1855         }
1856         case S_IFDIR:
1857         case S_IFREG:
1858         case S_IFCHR:
1859         case S_IFBLK:
1860         case S_IFIFO:
1861         case S_IFSOCK:
1862                 rc = 0;
1863                 break;
1864         default:
1865                 rc = -EINVAL;
1866                 break;
1867         }
1868         RETURN(rc);
1869 }
1870
1871 static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
1872                               struct mdd_object *p, struct mdd_object *c,
1873                               const struct lu_name *name,
1874                               struct lu_attr *attr,
1875                               struct thandle *handle,
1876                               const struct md_op_spec *spec,
1877                               struct linkea_data *ldata,
1878                               struct lu_buf *def_acl_buf,
1879                               struct lu_buf *acl_buf)
1880 {
1881         int rc;
1882
1883         rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
1884         if (rc)
1885                 GOTO(out, rc);
1886
1887 #ifdef CONFIG_FS_POSIX_ACL
1888         if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
1889                 /* if dir, then can inherit default ACl */
1890                 rc = mdo_declare_xattr_set(env, c, def_acl_buf,
1891                                            XATTR_NAME_ACL_DEFAULT,
1892                                            0, handle);
1893                 if (rc)
1894                         GOTO(out, rc);
1895         }
1896
1897         if (acl_buf->lb_len > 0) {
1898                 rc = mdo_declare_attr_set(env, c, attr, handle);
1899                 if (rc)
1900                         GOTO(out, rc);
1901
1902                 rc = mdo_declare_xattr_set(env, c, acl_buf,
1903                                            XATTR_NAME_ACL_ACCESS, 0, handle);
1904                 if (rc)
1905                         GOTO(out, rc);
1906         }
1907 #endif
1908
1909         if (S_ISDIR(attr->la_mode)) {
1910                 rc = mdo_declare_ref_add(env, p, handle);
1911                 if (rc)
1912                         GOTO(out, rc);
1913         }
1914
1915         rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
1916         if (rc)
1917                 GOTO(out, rc);
1918
1919         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
1920                 rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
1921         else
1922                 rc = mdo_declare_index_insert(env, p, mdo2fid(c),
1923                                               name->ln_name, handle);
1924         if (rc)
1925                 GOTO(out, rc);
1926
1927         /* replay case, create LOV EA from client data */
1928         if (spec->no_create ||
1929             (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
1930                 const struct lu_buf *buf;
1931
1932                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
1933                                         spec->u.sp_ea.eadatalen);
1934                 rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
1935                                            handle);
1936                 if (rc)
1937                         GOTO(out, rc);
1938         }
1939
1940         if (S_ISLNK(attr->la_mode)) {
1941                 rc = dt_declare_record_write(env, mdd_object_child(c),
1942                                              strlen(spec->u.sp_symname), 0,
1943                                              handle);
1944                 if (rc)
1945                         GOTO(out, rc);
1946         }
1947
1948         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
1949                 struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
1950
1951                 *la = *attr;
1952                 la->la_valid = LA_CTIME | LA_MTIME;
1953                 rc = mdo_declare_attr_set(env, p, la, handle);
1954                 if (rc)
1955                         return rc;
1956         }
1957
1958         rc = mdd_declare_changelog_store(env, mdd, name, handle);
1959         if (rc)
1960                 return rc;
1961
1962         /* XXX: For remote create, it should indicate the remote RPC
1963          * will be sent after local transaction is finished, which
1964          * is not very nice, but it will be removed once we fully support
1965          * async update */
1966         if (mdd_object_remote(p) && handle->th_update != NULL)
1967                 handle->th_update->tu_sent_after_local_trans = 1;
1968 out:
1969         return rc;
1970 }
1971
1972 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
1973                         struct lu_attr *la, struct lu_buf *def_acl_buf,
1974                         struct lu_buf *acl_buf)
1975 {
1976         int     rc;
1977         ENTRY;
1978
1979         if (S_ISLNK(la->la_mode)) {
1980                 acl_buf->lb_len = 0;
1981                 def_acl_buf->lb_len = 0;
1982                 RETURN(0);
1983         }
1984
1985         mdd_read_lock(env, pobj, MOR_TGT_PARENT);
1986         rc = mdo_xattr_get(env, pobj, def_acl_buf,
1987                            XATTR_NAME_ACL_DEFAULT, BYPASS_CAPA);
1988         mdd_read_unlock(env, pobj);
1989         if (rc > 0) {
1990                 /* If there are default ACL, fix mode/ACL by default ACL */
1991                 def_acl_buf->lb_len = rc;
1992                 LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
1993                 memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
1994                 acl_buf->lb_len = rc;
1995                 rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
1996                 if (rc < 0)
1997                         RETURN(rc);
1998         } else if (rc == -ENODATA || rc == -EOPNOTSUPP) {
1999                 /* If there are no default ACL, fix mode by mask */
2000                 struct lu_ucred *uc = lu_ucred(env);
2001
2002                 /* The create triggered by MDT internal events, such as
2003                  * LFSCK reset, will not contain valid "uc". */
2004                 if (unlikely(uc != NULL))
2005                         la->la_mode &= ~uc->uc_umask;
2006                 rc = 0;
2007                 acl_buf->lb_len = 0;
2008                 def_acl_buf->lb_len = 0;
2009         }
2010
2011         RETURN(rc);
2012 }
2013
2014 /*
2015  * Create object and insert it into namespace.
2016  */
2017 static int mdd_create(const struct lu_env *env, struct md_object *pobj,
2018                       const struct lu_name *lname, struct md_object *child,
2019                       struct md_op_spec *spec, struct md_attr* ma)
2020 {
2021         struct mdd_thread_info  *info = mdd_env_info(env);
2022         struct lu_attr          *la = &info->mti_la_for_fix;
2023         struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
2024         struct mdd_object       *son = md2mdd_obj(child);
2025         struct mdd_device       *mdd = mdo2mdd(pobj);
2026         struct lu_attr          *attr = &ma->ma_attr;
2027         struct thandle          *handle;
2028         struct lu_attr          *pattr = &info->mti_pattr;
2029         struct lu_buf           acl_buf;
2030         struct lu_buf           def_acl_buf;
2031         struct linkea_data      *ldata = &info->mti_link_data;
2032         const char              *name = lname->ln_name;
2033         int                      rc, created = 0, initialized = 0, inserted = 0;
2034         ENTRY;
2035
2036         /*
2037          * Two operations have to be performed:
2038          *
2039          *  - an allocation of a new object (->do_create()), and
2040          *
2041          *  - an insertion into a parent index (->dio_insert()).
2042          *
2043          * Due to locking, operation order is not important, when both are
2044          * successful, *but* error handling cases are quite different:
2045          *
2046          *  - if insertion is done first, and following object creation fails,
2047          *  insertion has to be rolled back, but this operation might fail
2048          *  also leaving us with dangling index entry.
2049          *
2050          *  - if creation is done first, is has to be undone if insertion
2051          *  fails, leaving us with leaked space, which is neither good, nor
2052          *  fatal.
2053          *
2054          * It seems that creation-first is simplest solution, but it is
2055          * sub-optimal in the frequent
2056          *
2057          *         $ mkdir foo
2058          *         $ mkdir foo
2059          *
2060          * case, because second mkdir is bound to create object, only to
2061          * destroy it immediately.
2062          *
2063          * To avoid this follow local file systems that do double lookup:
2064          *
2065          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
2066          *
2067          *     1. create            (mdd_object_create_internal())
2068          *
2069          *     2. insert            (__mdd_index_insert(), lookup again)
2070          */
2071
2072         rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
2073         if (rc != 0)
2074                 RETURN(rc);
2075
2076         /* Sanity checks before big job. */
2077         rc = mdd_create_sanity_check(env, pobj, pattr, lname, attr, spec);
2078         if (rc)
2079                 RETURN(rc);
2080
2081         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
2082                 GOTO(out_free, rc = -EINPROGRESS);
2083
2084         acl_buf.lb_buf = info->mti_xattr_buf;
2085         acl_buf.lb_len = sizeof(info->mti_xattr_buf);
2086         def_acl_buf.lb_buf = info->mti_key;
2087         def_acl_buf.lb_len = sizeof(info->mti_key);
2088         rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
2089         if (rc < 0)
2090                 GOTO(out_free, rc);
2091
2092         mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
2093
2094         handle = mdd_trans_create(env, mdd);
2095         if (IS_ERR(handle))
2096                 GOTO(out_free, rc = PTR_ERR(handle));
2097
2098         memset(ldata, 0, sizeof(*ldata));
2099         mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
2100                            lname, 1, 0, ldata);
2101
2102         rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
2103                                 handle, spec, ldata, &def_acl_buf, &acl_buf);
2104         if (rc)
2105                 GOTO(out_stop, rc);
2106
2107         rc = mdd_trans_start(env, mdd, handle);
2108         if (rc)
2109                 GOTO(out_stop, rc);
2110
2111         mdd_write_lock(env, son, MOR_TGT_CHILD);
2112         rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
2113         if (rc) {
2114                 mdd_write_unlock(env, son);
2115                 GOTO(cleanup, rc);
2116         }
2117
2118         created = 1;
2119
2120 #ifdef CONFIG_FS_POSIX_ACL
2121         if (def_acl_buf.lb_len > 0 && S_ISDIR(attr->la_mode)) {
2122                 /* set default acl */
2123                 rc = mdo_xattr_set(env, son, &def_acl_buf,
2124                                    XATTR_NAME_ACL_DEFAULT, 0,
2125                                    handle, BYPASS_CAPA);
2126                 if (rc) {
2127                         mdd_write_unlock(env, son);
2128                         GOTO(cleanup, rc);
2129                 }
2130         }
2131         /* set its own acl */
2132         if (acl_buf.lb_len > 0) {
2133                 rc = mdo_xattr_set(env, son, &acl_buf,
2134                                    XATTR_NAME_ACL_ACCESS,
2135                                    0, handle, BYPASS_CAPA);
2136                 if (rc) {
2137                         mdd_write_unlock(env, son);
2138                         GOTO(cleanup, rc);
2139                 }
2140         }
2141 #endif
2142
2143         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
2144                                    son, attr, handle, spec, ldata);
2145
2146         /*
2147          * in case of replay we just set LOVEA provided by the client
2148          * XXX: I think it would be interesting to try "old" way where
2149          *      MDT calls this xattr_set(LOV) in a different transaction.
2150          *      probably this way we code can be made better.
2151          */
2152         if (rc == 0 && (spec->no_create ||
2153                         (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
2154                          S_ISREG(attr->la_mode)))) {
2155                 const struct lu_buf *buf;
2156
2157                 buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
2158                                 spec->u.sp_ea.eadatalen);
2159                 rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
2160                                    BYPASS_CAPA);
2161         }
2162
2163         if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2164                 rc = __mdd_orphan_add(env, son, handle);
2165
2166         mdd_write_unlock(env, son);
2167
2168         if (rc != 0)
2169                 /*
2170                  * Object has no links, so it will be destroyed when last
2171                  * reference is released. (XXX not now.)
2172                  */
2173                 GOTO(cleanup, rc);
2174
2175         initialized = 1;
2176
2177         if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
2178                 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
2179                                         name, S_ISDIR(attr->la_mode), handle,
2180                                         mdd_object_capa(env, mdd_pobj));
2181
2182         if (rc != 0)
2183                 GOTO(cleanup, rc);
2184
2185         inserted = 1;
2186
2187         if (S_ISLNK(attr->la_mode)) {
2188                 struct lu_ucred  *uc = lu_ucred_assert(env);
2189                 struct dt_object *dt = mdd_object_child(son);
2190                 const char *target_name = spec->u.sp_symname;
2191                 int sym_len = strlen(target_name);
2192                 const struct lu_buf *buf;
2193                 loff_t pos = 0;
2194
2195                 buf = mdd_buf_get_const(env, target_name, sym_len);
2196                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
2197                                                 mdd_object_capa(env, son),
2198                                                 uc->uc_cap &
2199                                                 CFS_CAP_SYS_RESOURCE_MASK);
2200
2201                 if (rc == sym_len)
2202                         rc = 0;
2203                 else
2204                         GOTO(cleanup, rc = -EFAULT);
2205         }
2206
2207         /* volatile file creation does not update parent directory times */
2208         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2209                 GOTO(cleanup, rc = 0);
2210
2211         /* update parent directory mtime/ctime */
2212         *la = *attr;
2213         la->la_valid = LA_CTIME | LA_MTIME;
2214         rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
2215         if (rc)
2216                 GOTO(cleanup, rc);
2217
2218         EXIT;
2219 cleanup:
2220         if (rc != 0 && created != 0) {
2221                 int rc2;
2222
2223                 if (inserted != 0) {
2224                         if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
2225                                 rc2 = __mdd_orphan_del(env, son, handle);
2226                         else
2227                                 rc2 = __mdd_index_delete(env, mdd_pobj, name,
2228                                                          S_ISDIR(attr->la_mode),
2229                                                          handle, BYPASS_CAPA);
2230                         if (rc2 != 0)
2231                                 goto out_stop;
2232                 }
2233
2234                 mdd_write_lock(env, son, MOR_TGT_CHILD);
2235                 if (initialized != 0 && S_ISDIR(attr->la_mode)) {
2236                         /* Drop the reference, no need to delete "."/"..",
2237                          * because the object to be destroied directly. */
2238                         rc2 = mdo_ref_del(env, son, handle);
2239                         if (rc2 != 0) {
2240                                 mdd_write_unlock(env, son);
2241                                 goto out_stop;
2242                         }
2243                 }
2244
2245                 rc2 = mdo_ref_del(env, son, handle);
2246                 if (rc2 != 0) {
2247                         mdd_write_unlock(env, son);
2248                         goto out_stop;
2249                 }
2250
2251                 mdo_destroy(env, son, handle);
2252                 mdd_write_unlock(env, son);
2253         }
2254
2255         if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)))
2256                 rc = mdd_changelog_ns_store(env, mdd,
2257                         S_ISDIR(attr->la_mode) ? CL_MKDIR :
2258                         S_ISREG(attr->la_mode) ? CL_CREATE :
2259                         S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
2260                         0, son, mdd_pobj, lname, handle);
2261 out_stop:
2262         mdd_trans_stop(env, mdd, rc, handle);
2263 out_free:
2264         if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
2265                 /* if we vmalloced a large buffer drop it */
2266                 lu_buf_free(ldata->ld_buf);
2267
2268         /* The child object shouldn't be cached anymore */
2269         if (rc)
2270                 set_bit(LU_OBJECT_HEARD_BANSHEE,
2271                             &child->mo_lu.lo_header->loh_flags);
2272         return rc;
2273 }
2274
2275 /*
2276  * Get locks on parents in proper order
2277  * RETURN: < 0 - error, rename_order if successful
2278  */
2279 enum rename_order {
2280         MDD_RN_SAME,
2281         MDD_RN_SRCTGT,
2282         MDD_RN_TGTSRC
2283 };
2284
2285 static int mdd_rename_order(const struct lu_env *env,
2286                             struct mdd_device *mdd,
2287                             struct mdd_object *src_pobj,
2288                             const struct lu_attr *pattr,
2289                             struct mdd_object *tgt_pobj)
2290 {
2291         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
2292         int rc;
2293         ENTRY;
2294
2295         if (src_pobj == tgt_pobj)
2296                 RETURN(MDD_RN_SAME);
2297
2298         /* compared the parent child relationship of src_p&tgt_p */
2299         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
2300                 rc = MDD_RN_SRCTGT;
2301         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
2302                 rc = MDD_RN_TGTSRC;
2303         } else {
2304                 rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
2305                                    NULL);
2306                 if (rc == -EREMOTE)
2307                         rc = 0;
2308
2309                 if (rc == 1)
2310                         rc = MDD_RN_TGTSRC;
2311                 else if (rc == 0)
2312                         rc = MDD_RN_SRCTGT;
2313         }
2314
2315         RETURN(rc);
2316 }
2317
2318 /* has not mdd_write{read}_lock on any obj yet. */
2319 static int mdd_rename_sanity_check(const struct lu_env *env,
2320                                    struct mdd_object *src_pobj,
2321                                    const struct lu_attr *pattr,
2322                                    struct mdd_object *tgt_pobj,
2323                                    const struct lu_attr *tpattr,
2324                                    struct mdd_object *sobj,
2325                                    const struct lu_attr *cattr,
2326                                    struct mdd_object *tobj,
2327                                    const struct lu_attr *tattr)
2328 {
2329         int rc = 0;
2330         ENTRY;
2331
2332         /* XXX: when get here, sobj must NOT be NULL,
2333          * the other case has been processed in cld_rename
2334          * before mdd_rename and enable MDS_PERM_BYPASS. */
2335         LASSERT(sobj);
2336
2337         rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
2338         if (rc)
2339                 RETURN(rc);
2340
2341         /* XXX: when get here, "tobj == NULL" means tobj must
2342          * NOT exist (neither on remote MDS, such case has been
2343          * processed in cld_rename before mdd_rename and enable
2344          * MDS_PERM_BYPASS).
2345          * So check may_create, but not check may_unlink. */
2346         if (!tobj)
2347                 rc = mdd_may_create(env, tgt_pobj, tpattr, NULL,
2348                                     (src_pobj != tgt_pobj), 0);
2349         else
2350                 rc = mdd_may_delete(env, tgt_pobj, tpattr, tobj, tattr, cattr,
2351                                     (src_pobj != tgt_pobj), 1);
2352
2353         if (!rc && !tobj && (src_pobj != tgt_pobj) && S_ISDIR(cattr->la_mode))
2354                 rc = __mdd_may_link(env, tgt_pobj, tpattr);
2355
2356         RETURN(rc);
2357 }
2358
2359 static int mdd_declare_rename(const struct lu_env *env,
2360                               struct mdd_device *mdd,
2361                               struct mdd_object *mdd_spobj,
2362                               struct mdd_object *mdd_tpobj,
2363                               struct mdd_object *mdd_sobj,
2364                               struct mdd_object *mdd_tobj,
2365                               const struct lu_name *tname,
2366                               const struct lu_name *sname,
2367                               struct md_attr *ma,
2368                               struct linkea_data *ldata,
2369                               struct thandle *handle)
2370 {
2371         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2372         int rc;
2373
2374         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2375         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2376
2377         LASSERT(mdd_spobj);
2378         LASSERT(mdd_tpobj);
2379         LASSERT(mdd_sobj);
2380
2381         /* name from source dir */
2382         rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
2383         if (rc)
2384                 return rc;
2385
2386         /* .. from source child */
2387         if (S_ISDIR(mdd_object_type(mdd_sobj))) {
2388                 /* source child can be directory,
2389                  * counted by source dir's nlink */
2390                 rc = mdo_declare_ref_del(env, mdd_spobj, handle);
2391                 if (rc)
2392                         return rc;
2393                 if (mdd_spobj != mdd_tpobj) {
2394                         rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
2395                                                       handle);
2396                         if (rc)
2397                                 return rc;
2398
2399                         rc = mdo_declare_index_insert(env, mdd_sobj,
2400                                                       mdo2fid(mdd_tpobj),
2401                                                       dotdot, handle);
2402                         if (rc)
2403                                 return rc;
2404                 }
2405                 /* new target child can be directory,
2406                  * counted by target dir's nlink */
2407                 rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
2408                 if (rc)
2409                         return rc;
2410         }
2411
2412         la->la_valid = LA_CTIME | LA_MTIME;
2413         rc = mdo_declare_attr_set(env, mdd_spobj, la, handle);
2414         if (rc != 0)
2415                 return rc;
2416
2417         rc = mdo_declare_attr_set(env, mdd_tpobj, la, handle);
2418         if (rc != 0)
2419                 return rc;
2420
2421         la->la_valid = LA_CTIME;
2422         rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
2423         if (rc)
2424                 return rc;
2425
2426         rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
2427         if (rc)
2428                 return rc;
2429
2430         /* new name */
2431         rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
2432                         tname->ln_name, handle);
2433         if (rc)
2434                 return rc;
2435
2436         /* name from target dir (old name), we declare it unconditionally
2437          * as mdd_rename() calls delete unconditionally as well. so just
2438          * to balance declarations vs calls to change ... */
2439         rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
2440         if (rc)
2441                 return rc;
2442
2443         if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
2444                 /* delete target child in target parent directory */
2445                 rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2446                 if (rc)
2447                         return rc;
2448
2449                 if (S_ISDIR(mdd_object_type(mdd_tobj))) {
2450                         /* target child can be directory,
2451                          * delete "." reference in target child directory */
2452                         rc = mdo_declare_ref_del(env, mdd_tobj, handle);
2453                         if (rc)
2454                                 return rc;
2455
2456                         /* delete ".." reference in target parent directory */
2457                         rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
2458                         if (rc)
2459                                 return rc;
2460                 }
2461
2462                 la->la_valid = LA_CTIME;
2463                 rc = mdo_declare_attr_set(env, mdd_tobj, la, handle);
2464                 if (rc)
2465                         return rc;
2466
2467                 rc = mdd_declare_links_del(env, mdd_tobj, handle);
2468                 if (rc)
2469                         return rc;
2470
2471                 rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
2472                 if (rc)
2473                         return rc;
2474         }
2475
2476         rc = mdd_declare_changelog_ext_store(env, mdd, tname, sname, handle);
2477         if (rc)
2478                 return rc;
2479
2480         return rc;
2481 }
2482
2483 /* src object can be remote that is why we use only fid and type of object */
2484 static int mdd_rename(const struct lu_env *env,
2485                       struct md_object *src_pobj, struct md_object *tgt_pobj,
2486                       const struct lu_fid *lf, const struct lu_name *lsname,
2487                       struct md_object *tobj, const struct lu_name *ltname,
2488                       struct md_attr *ma)
2489 {
2490         const char *sname = lsname->ln_name;
2491         const char *tname = ltname->ln_name;
2492         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
2493         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
2494         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
2495         struct mdd_device *mdd = mdo2mdd(src_pobj);
2496         struct mdd_object *mdd_sobj = NULL;                  /* source object */
2497         struct mdd_object *mdd_tobj = NULL;
2498         struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
2499         struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
2500         struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
2501         struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
2502         struct thandle *handle;
2503         struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
2504         const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
2505         const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
2506         bool is_dir;
2507         bool tobj_ref = 0;
2508         bool tobj_locked = 0;
2509         unsigned cl_flags = 0;
2510         int rc, rc2;
2511         ENTRY;
2512
2513         if (tobj)
2514                 mdd_tobj = md2mdd_obj(tobj);
2515
2516         mdd_sobj = mdd_object_find(env, mdd, lf);
2517
2518         rc = mdd_la_get(env, mdd_sobj, cattr, BYPASS_CAPA);
2519         if (rc)
2520                 GOTO(out_pending, rc);
2521
2522         rc = mdd_la_get(env, mdd_spobj, pattr, BYPASS_CAPA);
2523         if (rc)
2524                 GOTO(out_pending, rc);
2525
2526         if (mdd_tobj) {
2527                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2528                 if (rc)
2529                         GOTO(out_pending, rc);
2530         }
2531
2532         rc = mdd_la_get(env, mdd_tpobj, tpattr, BYPASS_CAPA);
2533         if (rc)
2534                 GOTO(out_pending, rc);
2535
2536         rc = mdd_rename_sanity_check(env, mdd_spobj, pattr, mdd_tpobj, tpattr,
2537                                      mdd_sobj, cattr, mdd_tobj, tattr);
2538         if (rc)
2539                 GOTO(out_pending, rc);
2540
2541         handle = mdd_trans_create(env, mdd);
2542         if (IS_ERR(handle))
2543                 GOTO(out_pending, rc = PTR_ERR(handle));
2544
2545         memset(ldata, 0, sizeof(*ldata));
2546         mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdd_object_fid(mdd_tpobj),
2547                            ltname, 1, 0, ldata);
2548         rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
2549                                 mdd_tobj, lsname, ltname, ma, ldata, handle);
2550         if (rc)
2551                 GOTO(stop, rc);
2552
2553         rc = mdd_trans_start(env, mdd, handle);
2554         if (rc)
2555                 GOTO(stop, rc);
2556
2557         /* FIXME: Should consider tobj and sobj too in rename_lock. */
2558         rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
2559         if (rc < 0)
2560                 GOTO(cleanup_unlocked, rc);
2561
2562         is_dir = S_ISDIR(cattr->la_mode);
2563
2564         /* Remove source name from source directory */
2565         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
2566                                 mdd_object_capa(env, mdd_spobj));
2567         if (rc)
2568                 GOTO(cleanup, rc);
2569
2570         /* "mv dir1 dir2" needs "dir1/.." link update */
2571         if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) {
2572                 rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2573                                         mdd_object_capa(env, mdd_sobj));
2574                 if (rc)
2575                         GOTO(fixup_spobj2, rc);
2576
2577                 rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot,
2578                                       handle, mdd_object_capa(env, mdd_sobj));
2579                 if (rc)
2580                         GOTO(fixup_spobj, rc);
2581         }
2582
2583         /* Remove target name from target directory
2584          * Here tobj can be remote one, so we do index_delete unconditionally
2585          * and -ENOENT is allowed.
2586          */
2587         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2588                                 mdd_object_capa(env, mdd_tpobj));
2589         if (rc != 0) {
2590                 if (mdd_tobj) {
2591                         /* tname might been renamed to something else */
2592                         GOTO(fixup_spobj, rc);
2593                 }
2594                 if (rc != -ENOENT)
2595                         GOTO(fixup_spobj, rc);
2596         }
2597
2598         /* Insert new fid with target name into target dir */
2599         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
2600                                 mdd_object_capa(env, mdd_tpobj));
2601         if (rc)
2602                 GOTO(fixup_tpobj, rc);
2603
2604         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2605         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
2606
2607         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
2608         if (mdd_sobj) {
2609                 la->la_valid = LA_CTIME;
2610                 rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
2611                 if (rc)
2612                         GOTO(fixup_tpobj, rc);
2613         }
2614
2615         /* Remove old target object
2616          * For tobj is remote case cmm layer has processed
2617          * and set tobj to NULL then. So when tobj is NOT NULL,
2618          * it must be local one.
2619          */
2620         if (tobj && mdd_object_exists(mdd_tobj)) {
2621                 mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
2622                 tobj_locked = 1;
2623                 if (mdd_is_dead_obj(mdd_tobj)) {
2624                         /* shld not be dead, something is wrong */
2625                         CERROR("tobj is dead, something is wrong\n");
2626                         rc = -EINVAL;
2627                         goto cleanup;
2628                 }
2629                 mdo_ref_del(env, mdd_tobj, handle);
2630
2631                 /* Remove dot reference. */
2632                 if (S_ISDIR(tattr->la_mode))
2633                         mdo_ref_del(env, mdd_tobj, handle);
2634                 tobj_ref = 1;
2635
2636                 /* fetch updated nlink */
2637                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2638                 if (rc != 0) {
2639                         CERROR("%s: Failed to get nlink for tobj "
2640                                 DFID": rc = %d\n",
2641                                 mdd2obd_dev(mdd)->obd_name,
2642                                 PFID(tpobj_fid), rc);
2643                         GOTO(fixup_tpobj, rc);
2644                 }
2645
2646                 la->la_valid = LA_CTIME;
2647                 rc = mdd_update_time(env, mdd_tobj, tattr, la, handle);
2648                 if (rc != 0) {
2649                         CERROR("%s: Failed to set ctime for tobj "
2650                                 DFID": rc = %d\n",
2651                                 mdd2obd_dev(mdd)->obd_name,
2652                                 PFID(tpobj_fid), rc);
2653                         GOTO(fixup_tpobj, rc);
2654                 }
2655
2656                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2657                 ma->ma_attr = *tattr;
2658                 ma->ma_valid |= MA_INODE;
2659                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
2660                 if (rc != 0) {
2661                         CERROR("%s: Failed to unlink tobj "
2662                                 DFID": rc = %d\n",
2663                                 mdd2obd_dev(mdd)->obd_name,
2664                                 PFID(tpobj_fid), rc);
2665                         GOTO(fixup_tpobj, rc);
2666                 }
2667
2668                 /* fetch updated nlink */
2669                 rc = mdd_la_get(env, mdd_tobj, tattr, BYPASS_CAPA);
2670                 if (rc != 0) {
2671                         CERROR("%s: Failed to get nlink for tobj "
2672                                 DFID": rc = %d\n",
2673                                 mdd2obd_dev(mdd)->obd_name,
2674                                 PFID(tpobj_fid), rc);
2675                         GOTO(fixup_tpobj, rc);
2676                 }
2677                 /* XXX: this transfer to ma will be removed with LOD/OSP */
2678                 ma->ma_attr = *tattr;
2679                 ma->ma_valid |= MA_INODE;
2680
2681                 if (tattr->la_nlink == 0) {
2682                         cl_flags |= CLF_RENAME_LAST;
2683                         if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
2684                                 cl_flags |= CLF_RENAME_LAST_EXISTS;
2685                 }
2686         }
2687
2688         la->la_valid = LA_CTIME | LA_MTIME;
2689         rc = mdd_update_time(env, mdd_spobj, pattr, la, handle);
2690         if (rc)
2691                 GOTO(fixup_tpobj, rc);
2692
2693         if (mdd_spobj != mdd_tpobj) {
2694                 la->la_valid = LA_CTIME | LA_MTIME;
2695                 rc = mdd_update_time(env, mdd_tpobj, tpattr, la, handle);
2696         }
2697
2698         if (rc == 0 && mdd_sobj) {
2699                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
2700                 rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
2701                                       mdo2fid(mdd_tpobj), ltname, handle, NULL,
2702                                       0, 0);
2703                 if (rc == -ENOENT)
2704                         /* Old files might not have EA entry */
2705                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
2706                                       lsname, handle, NULL, 0);
2707                 mdd_write_unlock(env, mdd_sobj);
2708                 /* We don't fail the transaction if the link ea can't be
2709                    updated -- fid2path will use alternate lookup method. */
2710                 rc = 0;
2711         }
2712
2713         EXIT;
2714
2715 fixup_tpobj:
2716         if (rc) {
2717                 rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
2718                                          BYPASS_CAPA);
2719                 if (rc2)
2720                         CWARN("tp obj fix error %d\n",rc2);
2721
2722                 if (mdd_tobj && mdd_object_exists(mdd_tobj) &&
2723                     !mdd_is_dead_obj(mdd_tobj)) {
2724                         if (tobj_ref) {
2725                                 mdo_ref_add(env, mdd_tobj, handle);
2726                                 if (is_dir)
2727                                         mdo_ref_add(env, mdd_tobj, handle);
2728                         }
2729
2730                         rc2 = __mdd_index_insert(env, mdd_tpobj,
2731                                          mdo2fid(mdd_tobj), tname,
2732                                          is_dir, handle,
2733                                          BYPASS_CAPA);
2734
2735                         if (rc2)
2736                                 CWARN("tp obj fix error %d\n",rc2);
2737                 }
2738         }
2739
2740 fixup_spobj:
2741         if (rc && is_dir && mdd_sobj && mdd_spobj != mdd_tpobj) {
2742                 rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle,
2743                                               BYPASS_CAPA);
2744
2745                 if (rc2)
2746                         CWARN("%s: sp obj dotdot delete error: rc = %d\n",
2747                                mdd2obd_dev(mdd)->obd_name, rc2);
2748
2749
2750                 rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid,
2751                                               dotdot, handle, BYPASS_CAPA);
2752                 if (rc2)
2753                         CWARN("%s: sp obj dotdot insert error: rc = %d\n",
2754                               mdd2obd_dev(mdd)->obd_name, rc2);
2755         }
2756
2757 fixup_spobj2:
2758         if (rc) {
2759                 rc2 = __mdd_index_insert(env, mdd_spobj,
2760                                          lf, sname, is_dir, handle, BYPASS_CAPA);
2761                 if (rc2)
2762                         CWARN("sp obj fix error %d\n",rc2);
2763         }
2764 cleanup:
2765         if (tobj_locked)
2766                 mdd_write_unlock(env, mdd_tobj);
2767 cleanup_unlocked:
2768         if (rc == 0)
2769                 rc = mdd_changelog_ext_ns_store(env, mdd, CL_RENAME, cl_flags,
2770                                                 mdd_tobj, tpobj_fid, lf,
2771                                                 spobj_fid, ltname, lsname,
2772                                                 handle);
2773
2774 stop:
2775         mdd_trans_stop(env, mdd, rc, handle);
2776 out_pending:
2777         mdd_object_put(env, mdd_sobj);
2778         return rc;
2779 }
2780
2781 const struct md_dir_operations mdd_dir_ops = {
2782         .mdo_is_subdir     = mdd_is_subdir,
2783         .mdo_lookup        = mdd_lookup,
2784         .mdo_create        = mdd_create,
2785         .mdo_rename        = mdd_rename,
2786         .mdo_link          = mdd_link,
2787         .mdo_unlink        = mdd_unlink,
2788         .mdo_create_data   = mdd_create_data,
2789 };