Whamcloud - gitweb
cb742d5f45dc772d8e380c469192a3d001e75673
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         *buf = LU_BUF_NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 *buf = LU_BUF_NULL;
146         }
147         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         *buf = LU_BUF_NULL;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct lu_object *mdd_object_alloc(const struct lu_env *env,
183                                    const struct lu_object_header *hdr,
184                                    struct lu_device *d)
185 {
186         struct mdd_object *mdd_obj;
187
188         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
189         if (mdd_obj != NULL) {
190                 struct lu_object *o;
191
192                 o = mdd2lu_obj(mdd_obj);
193                 lu_object_init(o, NULL, d);
194                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
195                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
196                 mdd_obj->mod_count = 0;
197                 o->lo_ops = &mdd_lu_obj_ops;
198                 return o;
199         } else {
200                 return NULL;
201         }
202 }
203
204 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
205                            const struct lu_object_conf *unused)
206 {
207         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
208         struct mdd_object *mdd_obj = lu2mdd_obj(o);
209         struct lu_object  *below;
210         struct lu_device  *under;
211         ENTRY;
212
213         mdd_obj->mod_cltime = 0;
214         under = &d->mdd_child->dd_lu_dev;
215         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
216         mdd_pdlock_init(mdd_obj);
217         if (below == NULL)
218                 RETURN(-ENOMEM);
219
220         lu_object_add(o, below);
221
222         RETURN(0);
223 }
224
225 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
226 {
227         if (lu_object_exists(o))
228                 return mdd_get_flags(env, lu2mdd_obj(o));
229         else
230                 return 0;
231 }
232
233 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
234 {
235         struct mdd_object *mdd = lu2mdd_obj(o);
236
237         lu_object_fini(o);
238         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
239 }
240
241 static int mdd_object_print(const struct lu_env *env, void *cookie,
242                             lu_printer_t p, const struct lu_object *o)
243 {
244         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
245         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
246                     "valid=%x, cltime="LPU64", flags=%lx)",
247                     mdd, mdd->mod_count, mdd->mod_valid,
248                     mdd->mod_cltime, mdd->mod_flags);
249 }
250
251 static const struct lu_object_operations mdd_lu_obj_ops = {
252         .loo_object_init    = mdd_object_init,
253         .loo_object_start   = mdd_object_start,
254         .loo_object_free    = mdd_object_free,
255         .loo_object_print   = mdd_object_print,
256 };
257
258 struct mdd_object *mdd_object_find(const struct lu_env *env,
259                                    struct mdd_device *d,
260                                    const struct lu_fid *f)
261 {
262         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
263 }
264
265 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
266                         const char *path, struct lu_fid *fid)
267 {
268         struct lu_buf *buf;
269         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
270         struct mdd_object *obj;
271         struct lu_name *lname = &mdd_env_info(env)->mti_name;
272         char *name;
273         int rc = 0;
274         ENTRY;
275
276         /* temp buffer for path element */
277         buf = mdd_buf_alloc(env, PATH_MAX);
278         if (buf->lb_buf == NULL)
279                 RETURN(-ENOMEM);
280
281         lname->ln_name = name = buf->lb_buf;
282         lname->ln_namelen = 0;
283         *f = mdd->mdd_root_fid;
284
285         while(1) {
286                 while (*path == '/')
287                         path++;
288                 if (*path == '\0')
289                         break;
290                 while (*path != '/' && *path != '\0') {
291                         *name = *path;
292                         path++;
293                         name++;
294                         lname->ln_namelen++;
295                 }
296
297                 *name = '\0';
298                 /* find obj corresponding to fid */
299                 obj = mdd_object_find(env, mdd, f);
300                 if (obj == NULL)
301                         GOTO(out, rc = -EREMOTE);
302                 if (IS_ERR(obj))
303                         GOTO(out, rc = PTR_ERR(obj));
304                 /* get child fid from parent and name */
305                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
306                 mdd_object_put(env, obj);
307                 if (rc)
308                         break;
309
310                 name = buf->lb_buf;
311                 lname->ln_namelen = 0;
312         }
313
314         if (!rc)
315                 *fid = *f;
316 out:
317         RETURN(rc);
318 }
319
320 /** The maximum depth that fid2path() will search.
321  * This is limited only because we want to store the fids for
322  * historical path lookup purposes.
323  */
324 #define MAX_PATH_DEPTH 100
325
326 /** mdd_path() lookup structure. */
327 struct path_lookup_info {
328         __u64                pli_recno;        /**< history point */
329         __u64                pli_currec;       /**< current record */
330         struct lu_fid        pli_fid;
331         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
332         struct mdd_object   *pli_mdd_obj;
333         char                *pli_path;         /**< full path */
334         int                  pli_pathlen;
335         int                  pli_linkno;       /**< which hardlink to follow */
336         int                  pli_fidcount;     /**< number of \a pli_fids */
337 };
338
339 static int mdd_path_current(const struct lu_env *env,
340                             struct path_lookup_info *pli)
341 {
342         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
343         struct mdd_object *mdd_obj;
344         struct lu_buf     *buf = NULL;
345         struct link_ea_header *leh;
346         struct link_ea_entry  *lee;
347         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
348         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
349         char *ptr;
350         int reclen;
351         int rc;
352         ENTRY;
353
354         ptr = pli->pli_path + pli->pli_pathlen - 1;
355         *ptr = 0;
356         --ptr;
357         pli->pli_fidcount = 0;
358         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
359
360         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
361                 mdd_obj = mdd_object_find(env, mdd,
362                                           &pli->pli_fids[pli->pli_fidcount]);
363                 if (mdd_obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(mdd_obj))
366                         GOTO(out, rc = PTR_ERR(mdd_obj));
367                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
368                 if (rc <= 0) {
369                         mdd_object_put(env, mdd_obj);
370                         if (rc == -1)
371                                 rc = -EREMOTE;
372                         else if (rc == 0)
373                                 /* Do I need to error out here? */
374                                 rc = -ENOENT;
375                         GOTO(out, rc);
376                 }
377
378                 /* Get parent fid and object name */
379                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
380                 buf = mdd_links_get(env, mdd_obj);
381                 mdd_read_unlock(env, mdd_obj);
382                 mdd_object_put(env, mdd_obj);
383                 if (IS_ERR(buf))
384                         GOTO(out, rc = PTR_ERR(buf));
385
386                 leh = buf->lb_buf;
387                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
388                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
389
390                 /* If set, use link #linkno for path lookup, otherwise use
391                    link #0.  Only do this for the final path element. */
392                 if ((pli->pli_fidcount == 0) &&
393                     (pli->pli_linkno < leh->leh_reccount)) {
394                         int count;
395                         for (count = 0; count < pli->pli_linkno; count++) {
396                                 lee = (struct link_ea_entry *)
397                                      ((char *)lee + reclen);
398                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
399                         }
400                         if (pli->pli_linkno < leh->leh_reccount - 1)
401                                 /* indicate to user there are more links */
402                                 pli->pli_linkno++;
403                 }
404
405                 /* Pack the name in the end of the buffer */
406                 ptr -= tmpname->ln_namelen;
407                 if (ptr - 1 <= pli->pli_path)
408                         GOTO(out, rc = -EOVERFLOW);
409                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
410                 *(--ptr) = '/';
411
412                 /* Store the parent fid for historic lookup */
413                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
414                         GOTO(out, rc = -EOVERFLOW);
415                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
416         }
417
418         /* Verify that our path hasn't changed since we started the lookup.
419            Record the current index, and verify the path resolves to the
420            same fid. If it does, then the path is correct as of this index. */
421         spin_lock(&mdd->mdd_cl.mc_lock);
422         pli->pli_currec = mdd->mdd_cl.mc_index;
423         spin_unlock(&mdd->mdd_cl.mc_lock);
424         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
425         if (rc) {
426                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
427                 GOTO (out, rc = -EAGAIN);
428         }
429         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
430                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
431                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
432                        PFID(&pli->pli_fid));
433                 GOTO(out, rc = -EAGAIN);
434         }
435         ptr++; /* skip leading / */
436         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
437
438         EXIT;
439 out:
440         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
441                 /* if we vmalloced a large buffer drop it */
442                 mdd_buf_put(buf);
443
444         return rc;
445 }
446
447 static int mdd_path_historic(const struct lu_env *env,
448                              struct path_lookup_info *pli)
449 {
450         return 0;
451 }
452
453 /* Returns the full path to this fid, as of changelog record recno. */
454 static int mdd_path(const struct lu_env *env, struct md_object *obj,
455                     char *path, int pathlen, __u64 *recno, int *linkno)
456 {
457         struct path_lookup_info *pli;
458         int tries = 3;
459         int rc = -EAGAIN;
460         ENTRY;
461
462         if (pathlen < 3)
463                 RETURN(-EOVERFLOW);
464
465         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
466                 path[0] = '\0';
467                 RETURN(0);
468         }
469
470         OBD_ALLOC_PTR(pli);
471         if (pli == NULL)
472                 RETURN(-ENOMEM);
473
474         pli->pli_mdd_obj = md2mdd_obj(obj);
475         pli->pli_recno = *recno;
476         pli->pli_path = path;
477         pli->pli_pathlen = pathlen;
478         pli->pli_linkno = *linkno;
479
480         /* Retry multiple times in case file is being moved */
481         while (tries-- && rc == -EAGAIN)
482                 rc = mdd_path_current(env, pli);
483
484         /* For historical path lookup, the current links may not have existed
485          * at "recno" time.  We must switch over to earlier links/parents
486          * by using the changelog records.  If the earlier parent doesn't
487          * exist, we must search back through the changelog to reconstruct
488          * its parents, then check if it exists, etc.
489          * We may ignore this problem for the initial implementation and
490          * state that an "original" hardlink must still exist for us to find
491          * historic path name. */
492         if (pli->pli_recno != -1) {
493                 rc = mdd_path_historic(env, pli);
494         } else {
495                 *recno = pli->pli_currec;
496                 /* Return next link index to caller */
497                 *linkno = pli->pli_linkno;
498         }
499
500         OBD_FREE_PTR(pli);
501
502         RETURN (rc);
503 }
504
505 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
506 {
507         struct lu_attr *la = &mdd_env_info(env)->mti_la;
508         int rc;
509
510         ENTRY;
511         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
512         if (rc == 0) {
513                 mdd_flags_xlate(obj, la->la_flags);
514         }
515         RETURN(rc);
516 }
517
518 /*
519  * No permission check is needed.
520  */
521 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
522                  struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526
527         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
528                           mdd_object_capa(env, md2mdd_obj(obj)));
529         RETURN(rc);
530 }
531
532 /*
533  * No permission check is needed.
534  */
535 static int mdd_xattr_get(const struct lu_env *env,
536                          struct md_object *obj, struct lu_buf *buf,
537                          const char *name)
538 {
539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
540         int rc;
541
542         ENTRY;
543
544         if (mdd_object_exists(mdd_obj) == 0) {
545                 CERROR("%s: object "DFID" not found: rc = -2\n",
546                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
547                 return -ENOENT;
548         }
549
550         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
551         rc = mdo_xattr_get(env, mdd_obj, buf, name,
552                            mdd_object_capa(env, mdd_obj));
553         mdd_read_unlock(env, mdd_obj);
554
555         RETURN(rc);
556 }
557
558 /*
559  * Permission check is done when open,
560  * no need check again.
561  */
562 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
563                         struct lu_buf *buf)
564 {
565         struct mdd_object *mdd_obj = md2mdd_obj(obj);
566         struct dt_object  *next;
567         loff_t             pos = 0;
568         int                rc;
569         ENTRY;
570
571         if (mdd_object_exists(mdd_obj) == 0) {
572                 CERROR("%s: object "DFID" not found: rc = -2\n",
573                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
574                 return -ENOENT;
575         }
576
577         next = mdd_object_child(mdd_obj);
578         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
579         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
580                                          mdd_object_capa(env, mdd_obj));
581         mdd_read_unlock(env, mdd_obj);
582         RETURN(rc);
583 }
584
585 /*
586  * No permission check is needed.
587  */
588 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
589                           struct lu_buf *buf)
590 {
591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
592         int rc;
593
594         ENTRY;
595
596         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
597         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
598         mdd_read_unlock(env, mdd_obj);
599
600         RETURN(rc);
601 }
602
603 int mdd_declare_object_create_internal(const struct lu_env *env,
604                                        struct mdd_object *p,
605                                        struct mdd_object *c,
606                                        struct lu_attr *attr,
607                                        struct thandle *handle,
608                                        const struct md_op_spec *spec)
609 {
610         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
611         const struct dt_index_features *feat = spec->sp_feat;
612         int rc;
613         ENTRY;
614
615         if (feat != &dt_directory_features && feat != NULL) {
616                 dof->dof_type = DFT_INDEX;
617                 dof->u.dof_idx.di_feat = feat;
618
619         } else {
620                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
621                 if (dof->dof_type == DFT_REGULAR) {
622                         dof->u.dof_reg.striped =
623                                 md_should_create(spec->sp_cr_flags);
624                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
625                                 dof->u.dof_reg.striped = 0;
626                         /* is this replay? */
627                         if (spec->no_create)
628                                 dof->u.dof_reg.striped = 0;
629                 }
630         }
631
632         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
633
634         RETURN(rc);
635 }
636
637 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
638                                struct mdd_object *c, struct lu_attr *attr,
639                                struct thandle *handle,
640                                const struct md_op_spec *spec)
641 {
642         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
643         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
644         int rc;
645         ENTRY;
646
647         LASSERT(!mdd_object_exists(c));
648
649         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
650
651         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
652
653         RETURN(rc);
654 }
655
656 /**
657  * Make sure the ctime is increased only.
658  */
659 static inline int mdd_attr_check(const struct lu_env *env,
660                                  struct mdd_object *obj,
661                                  struct lu_attr *attr)
662 {
663         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
664         int rc;
665         ENTRY;
666
667         if (attr->la_valid & LA_CTIME) {
668                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
669                 if (rc)
670                         RETURN(rc);
671
672                 if (attr->la_ctime < tmp_la->la_ctime)
673                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
674                 else if (attr->la_valid == LA_CTIME &&
675                          attr->la_ctime == tmp_la->la_ctime)
676                         attr->la_valid &= ~LA_CTIME;
677         }
678         RETURN(0);
679 }
680
681 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
682                           struct lu_attr *attr, struct thandle *handle,
683                           int needacl)
684 {
685         int rc;
686         ENTRY;
687
688         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
689 #ifdef CONFIG_FS_POSIX_ACL
690         if (!rc && (attr->la_valid & LA_MODE) && needacl)
691                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
692 #endif
693         RETURN(rc);
694 }
695
696 int mdd_attr_check_set_internal(const struct lu_env *env,
697                                 struct mdd_object *obj, struct lu_attr *attr,
698                                 struct thandle *handle, int needacl)
699 {
700         int rc;
701         ENTRY;
702
703         rc = mdd_attr_check(env, obj, attr);
704         if (rc)
705                 RETURN(rc);
706
707         if (attr->la_valid)
708                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
709         RETURN(rc);
710 }
711
712 /*
713  * This gives the same functionality as the code between
714  * sys_chmod and inode_setattr
715  * chown_common and inode_setattr
716  * utimes and inode_setattr
717  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
718  */
719 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
720                         struct lu_attr *la, const unsigned long flags)
721 {
722         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
723         struct lu_ucred  *uc;
724         int               rc;
725         ENTRY;
726
727         if (!la->la_valid)
728                 RETURN(0);
729
730         /* Do not permit change file type */
731         if (la->la_valid & LA_TYPE)
732                 RETURN(-EPERM);
733
734         /* They should not be processed by setattr */
735         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
736                 RETURN(-EPERM);
737
738         /* export destroy does not have ->le_ses, but we may want
739          * to drop LUSTRE_SOM_FL. */
740         uc = lu_ucred_check(env);
741         if (uc == NULL)
742                 RETURN(0);
743
744         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
745         if (rc)
746                 RETURN(rc);
747
748         if (la->la_valid == LA_CTIME) {
749                 if (!(flags & MDS_PERM_BYPASS))
750                         /* This is only for set ctime when rename's source is
751                          * on remote MDS. */
752                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
753                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
754                         la->la_valid &= ~LA_CTIME;
755                 RETURN(rc);
756         }
757
758         if (la->la_valid == LA_ATIME) {
759                 /* This is atime only set for read atime update on close. */
760                 if (la->la_atime >= tmp_la->la_atime &&
761                     la->la_atime < (tmp_la->la_atime +
762                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
763                         la->la_valid &= ~LA_ATIME;
764                 RETURN(0);
765         }
766
767         /* Check if flags change. */
768         if (la->la_valid & LA_FLAGS) {
769                 unsigned int oldflags = 0;
770                 unsigned int newflags = la->la_flags &
771                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
772
773                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
774                     !mdd_capable(uc, CFS_CAP_FOWNER))
775                         RETURN(-EPERM);
776
777                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
778                  * only be changed by the relevant capability. */
779                 if (mdd_is_immutable(obj))
780                         oldflags |= LUSTRE_IMMUTABLE_FL;
781                 if (mdd_is_append(obj))
782                         oldflags |= LUSTRE_APPEND_FL;
783                 if ((oldflags ^ newflags) &&
784                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
785                         RETURN(-EPERM);
786
787                 if (!S_ISDIR(tmp_la->la_mode))
788                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
789         }
790
791         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
792             (la->la_valid & ~LA_FLAGS) &&
793             !(flags & MDS_PERM_BYPASS))
794                 RETURN(-EPERM);
795
796         /* Check for setting the obj time. */
797         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
798             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
799                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
800                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
801                         rc = mdd_permission_internal(env, obj, tmp_la,
802                                                      MAY_WRITE);
803                         if (rc)
804                                 RETURN(rc);
805                 }
806         }
807
808         if (la->la_valid & LA_KILL_SUID) {
809                 la->la_valid &= ~LA_KILL_SUID;
810                 if ((tmp_la->la_mode & S_ISUID) &&
811                     !(la->la_valid & LA_MODE)) {
812                         la->la_mode = tmp_la->la_mode;
813                         la->la_valid |= LA_MODE;
814                 }
815                 la->la_mode &= ~S_ISUID;
816         }
817
818         if (la->la_valid & LA_KILL_SGID) {
819                 la->la_valid &= ~LA_KILL_SGID;
820                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
821                                         (S_ISGID | S_IXGRP)) &&
822                     !(la->la_valid & LA_MODE)) {
823                         la->la_mode = tmp_la->la_mode;
824                         la->la_valid |= LA_MODE;
825                 }
826                 la->la_mode &= ~S_ISGID;
827         }
828
829         /* Make sure a caller can chmod. */
830         if (la->la_valid & LA_MODE) {
831                 if (!(flags & MDS_PERM_BYPASS) &&
832                     (uc->uc_fsuid != tmp_la->la_uid) &&
833                     !mdd_capable(uc, CFS_CAP_FOWNER))
834                         RETURN(-EPERM);
835
836                 if (la->la_mode == (cfs_umode_t) -1)
837                         la->la_mode = tmp_la->la_mode;
838                 else
839                         la->la_mode = (la->la_mode & S_IALLUGO) |
840                                       (tmp_la->la_mode & ~S_IALLUGO);
841
842                 /* Also check the setgid bit! */
843                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
844                                        la->la_gid : tmp_la->la_gid) &&
845                     !mdd_capable(uc, CFS_CAP_FSETID))
846                         la->la_mode &= ~S_ISGID;
847         } else {
848                la->la_mode = tmp_la->la_mode;
849         }
850
851         /* Make sure a caller can chown. */
852         if (la->la_valid & LA_UID) {
853                 if (la->la_uid == (uid_t) -1)
854                         la->la_uid = tmp_la->la_uid;
855                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
856                      (la->la_uid != tmp_la->la_uid)) &&
857                     !mdd_capable(uc, CFS_CAP_CHOWN))
858                         RETURN(-EPERM);
859
860                 /* If the user or group of a non-directory has been
861                  * changed by a non-root user, remove the setuid bit.
862                  * 19981026 David C Niemi <niemi@tux.org>
863                  *
864                  * Changed this to apply to all users, including root,
865                  * to avoid some races. This is the behavior we had in
866                  * 2.0. The check for non-root was definitely wrong
867                  * for 2.2 anyway, as it should have been using
868                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
869                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
870                     !S_ISDIR(tmp_la->la_mode)) {
871                         la->la_mode &= ~S_ISUID;
872                         la->la_valid |= LA_MODE;
873                 }
874         }
875
876         /* Make sure caller can chgrp. */
877         if (la->la_valid & LA_GID) {
878                 if (la->la_gid == (gid_t) -1)
879                         la->la_gid = tmp_la->la_gid;
880                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
881                      ((la->la_gid != tmp_la->la_gid) &&
882                       !lustre_in_group_p(uc, la->la_gid))) &&
883                     !mdd_capable(uc, CFS_CAP_CHOWN))
884                         RETURN(-EPERM);
885
886                 /* Likewise, if the user or group of a non-directory
887                  * has been changed by a non-root user, remove the
888                  * setgid bit UNLESS there is no group execute bit
889                  * (this would be a file marked for mandatory
890                  * locking).  19981026 David C Niemi <niemi@tux.org>
891                  *
892                  * Removed the fsuid check (see the comment above) --
893                  * 19990830 SD. */
894                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
895                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
896                         la->la_mode &= ~S_ISGID;
897                         la->la_valid |= LA_MODE;
898                 }
899         }
900
901         /* For both Size-on-MDS case and truncate case,
902          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
903          * We distinguish them by "flags & MDS_SOM".
904          * For SOM case, it is true, the MAY_WRITE perm has been checked
905          * when open, no need check again. For truncate case, it is false,
906          * the MAY_WRITE perm should be checked here. */
907         if (flags & MDS_SOM) {
908                 /* For the "Size-on-MDS" setattr update, merge coming
909                  * attributes with the set in the inode. BUG 10641 */
910                 if ((la->la_valid & LA_ATIME) &&
911                     (la->la_atime <= tmp_la->la_atime))
912                         la->la_valid &= ~LA_ATIME;
913
914                 /* OST attributes do not have a priority over MDS attributes,
915                  * so drop times if ctime is equal. */
916                 if ((la->la_valid & LA_CTIME) &&
917                     (la->la_ctime <= tmp_la->la_ctime))
918                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
919         } else {
920                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
921                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
922                               (uc->uc_fsuid == tmp_la->la_uid)) &&
923                             !(flags & MDS_PERM_BYPASS)) {
924                                 rc = mdd_permission_internal(env, obj,
925                                                              tmp_la, MAY_WRITE);
926                                 if (rc)
927                                         RETURN(rc);
928                         }
929                 }
930                 if (la->la_valid & LA_CTIME) {
931                         /* The pure setattr, it has the priority over what is
932                          * already set, do not drop it if ctime is equal. */
933                         if (la->la_ctime < tmp_la->la_ctime)
934                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
935                                                   LA_CTIME);
936                 }
937         }
938
939         RETURN(0);
940 }
941
942 /** Store a data change changelog record
943  * If this fails, we must fail the whole transaction; we don't
944  * want the change to commit without the log entry.
945  * \param mdd_obj - mdd_object of change
946  * \param handle - transacion handle
947  */
948 static int mdd_changelog_data_store(const struct lu_env *env,
949                                     struct mdd_device *mdd,
950                                     enum changelog_rec_type type,
951                                     int flags, struct mdd_object *mdd_obj,
952                                     struct thandle *handle)
953 {
954         const struct lu_fid             *tfid = mdo2fid(mdd_obj);
955         struct llog_changelog_rec       *rec;
956         struct lu_buf                   *buf;
957         int                              reclen;
958         int                              rc;
959
960         /* Not recording */
961         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
962                 RETURN(0);
963         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
964                 RETURN(0);
965
966         LASSERT(mdd_obj != NULL);
967         LASSERT(handle != NULL);
968
969         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
970             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
971                 /* Don't need multiple updates in this log */
972                 /* Don't check under lock - no big deal if we get an extra
973                    entry */
974                 RETURN(0);
975         }
976
977         reclen = llog_data_len(sizeof(*rec));
978         buf = mdd_buf_alloc(env, reclen);
979         if (buf->lb_buf == NULL)
980                 RETURN(-ENOMEM);
981         rec = buf->lb_buf;
982
983         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
984         rec->cr.cr_type = (__u32)type;
985         rec->cr.cr_tfid = *tfid;
986         rec->cr.cr_namelen = 0;
987         mdd_obj->mod_cltime = cfs_time_current_64();
988
989         rc = mdd_changelog_store(env, mdd, rec, handle);
990
991         RETURN(rc);
992 }
993
994 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
995                   int flags, struct md_object *obj)
996 {
997         struct thandle *handle;
998         struct mdd_object *mdd_obj = md2mdd_obj(obj);
999         struct mdd_device *mdd = mdo2mdd(obj);
1000         int rc;
1001         ENTRY;
1002
1003         handle = mdd_trans_create(env, mdd);
1004         if (IS_ERR(handle))
1005                 RETURN(PTR_ERR(handle));
1006
1007         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1008         if (rc)
1009                 GOTO(stop, rc);
1010
1011         rc = mdd_trans_start(env, mdd, handle);
1012         if (rc)
1013                 GOTO(stop, rc);
1014
1015         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1016                                       handle);
1017
1018 stop:
1019         mdd_trans_stop(env, mdd, rc, handle);
1020
1021         RETURN(rc);
1022 }
1023
1024 /**
1025  * Save LMA extended attributes with data from \a ma.
1026  *
1027  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1028  * not, LMA EA will be first read from disk, modified and write back.
1029  *
1030  */
1031 /* Precedence for choosing record type when multiple
1032  * attributes change: setattr > mtime > ctime > atime
1033  * (ctime changes when mtime does, plus chmod/chown.
1034  * atime and ctime are independent.) */
1035 static int mdd_attr_set_changelog(const struct lu_env *env,
1036                                   struct md_object *obj, struct thandle *handle,
1037                                   __u64 valid)
1038 {
1039         struct mdd_device *mdd = mdo2mdd(obj);
1040         int bits, type = 0;
1041
1042         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1043         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1044         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1045         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1046         bits = bits & mdd->mdd_cl.mc_mask;
1047         if (bits == 0)
1048                 return 0;
1049
1050         /* The record type is the lowest non-masked set bit */
1051         while (bits && ((bits & 1) == 0)) {
1052                 bits = bits >> 1;
1053                 type++;
1054         }
1055
1056         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1057         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1058                                         md2mdd_obj(obj), handle);
1059 }
1060
1061 static int mdd_declare_attr_set(const struct lu_env *env,
1062                                 struct mdd_device *mdd,
1063                                 struct mdd_object *obj,
1064                                 const struct lu_attr *attr,
1065                                 struct thandle *handle)
1066 {
1067         int rc;
1068
1069         rc = mdo_declare_attr_set(env, obj, attr, handle);
1070         if (rc)
1071                 return rc;
1072
1073 #ifdef CONFIG_FS_POSIX_ACL
1074         if (attr->la_valid & LA_MODE) {
1075                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1076                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1077                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1078                 mdd_read_unlock(env, obj);
1079                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1080                         rc = 0;
1081                 else if (rc < 0)
1082                         return rc;
1083
1084                 if (rc != 0) {
1085                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1086                         rc = mdo_declare_xattr_set(env, obj, buf,
1087                                                    XATTR_NAME_ACL_ACCESS, 0,
1088                                                    handle);
1089                         if (rc)
1090                                 return rc;
1091                 }
1092         }
1093 #endif
1094
1095         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1096         return rc;
1097 }
1098
1099 /* set attr and LOV EA at once, return updated attr */
1100 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1101                  const struct md_attr *ma)
1102 {
1103         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1104         struct mdd_device *mdd = mdo2mdd(obj);
1105         struct thandle *handle;
1106         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1107         const struct lu_attr *la = &ma->ma_attr;
1108         int rc;
1109         ENTRY;
1110
1111         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1112         LASSERT((ma->ma_valid & MA_LOV) == 0);
1113         LASSERT((ma->ma_valid & MA_HSM) == 0);
1114         LASSERT((ma->ma_valid & MA_SOM) == 0);
1115
1116         *la_copy = ma->ma_attr;
1117         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1118         if (rc)
1119                 RETURN(rc);
1120
1121         /* setattr on "close" only change atime, or do nothing */
1122         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1123                 RETURN(0);
1124
1125         handle = mdd_trans_create(env, mdd);
1126         if (IS_ERR(handle))
1127                 RETURN(PTR_ERR(handle));
1128
1129         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1130         if (rc)
1131                 GOTO(stop, rc);
1132
1133         rc = mdd_trans_start(env, mdd, handle);
1134         if (rc)
1135                 GOTO(stop, rc);
1136
1137         /* permission changes may require sync operation */
1138         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1139                 handle->th_sync |= !!mdd->mdd_sync_permission;
1140
1141         if (la->la_valid & (LA_MTIME | LA_CTIME))
1142                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1143                        la->la_mtime, la->la_ctime);
1144
1145         if (la_copy->la_valid & LA_FLAGS) {
1146                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1147                 if (rc == 0)
1148                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1149         } else if (la_copy->la_valid) {            /* setattr */
1150                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1151         }
1152
1153         if (rc == 0)
1154                 rc = mdd_attr_set_changelog(env, obj, handle,
1155                                             la->la_valid);
1156 stop:
1157         mdd_trans_stop(env, mdd, rc, handle);
1158         RETURN(rc);
1159 }
1160
1161 static int mdd_xattr_sanity_check(const struct lu_env *env,
1162                                   struct mdd_object *obj)
1163 {
1164         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1165         struct lu_ucred *uc     = lu_ucred_assert(env);
1166         int rc;
1167         ENTRY;
1168
1169         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1170                 RETURN(-EPERM);
1171
1172         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1173         if (rc)
1174                 RETURN(rc);
1175
1176         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1177             !mdd_capable(uc, CFS_CAP_FOWNER))
1178                 RETURN(-EPERM);
1179
1180         RETURN(rc);
1181 }
1182
1183 static int mdd_declare_xattr_set(const struct lu_env *env,
1184                                  struct mdd_device *mdd,
1185                                  struct mdd_object *obj,
1186                                  const struct lu_buf *buf,
1187                                  const char *name,
1188                                  int fl, struct thandle *handle)
1189 {
1190         int     rc;
1191
1192         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1193         if (rc)
1194                 return rc;
1195
1196         /* Only record user xattr changes */
1197         if ((strncmp("user.", name, 5) == 0)) {
1198                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1199                 if (rc)
1200                         return rc;
1201         }
1202
1203         /* If HSM data is modified, this could add a changelog */
1204         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
1205                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1206
1207         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1208         return rc;
1209 }
1210
1211 /*
1212  * Compare current and future data of HSM EA and add a changelog if needed.
1213  *
1214  * Caller should have write-locked \param obj.
1215  *
1216  * \param buf - Future HSM EA content.
1217  * \retval 0 if no changelog is needed or changelog was added properly.
1218  * \retval -ve errno if there was a problem
1219  */
1220 static int mdd_hsm_update_locked(const struct lu_env *env,
1221                                  struct md_object *obj,
1222                                  const struct lu_buf *buf,
1223                                  struct thandle *handle)
1224 {
1225         struct mdd_thread_info *info = mdd_env_info(env);
1226         struct mdd_device      *mdd = mdo2mdd(obj);
1227         struct mdd_object      *mdd_obj = md2mdd_obj(obj);
1228         struct lu_buf          *current_buf = &info->mti_buf;
1229         struct md_hsm          *current_mh;
1230         struct md_hsm          *new_mh;
1231         int                     rc;
1232         ENTRY;
1233
1234         OBD_ALLOC_PTR(current_mh);
1235         if (current_mh == NULL)
1236                 RETURN(-ENOMEM);
1237
1238         /* Read HSM attrs from disk */
1239         current_buf->lb_buf = info->mti_xattr_buf;
1240         current_buf->lb_len = sizeof(info->mti_xattr_buf);
1241         CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
1242         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
1243                            mdd_object_capa(env, mdd_obj));
1244         rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
1245         if (rc < 0 && rc != -ENODATA)
1246                 GOTO(free, rc);
1247         else if (rc == -ENODATA)
1248                 current_mh->mh_flags = 0;
1249
1250         /* Map future HSM xattr */
1251         OBD_ALLOC_PTR(new_mh);
1252         if (new_mh == NULL)
1253                 GOTO(free, rc = -ENOMEM);
1254         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1255
1256         /* If HSM flags are different, add a changelog */
1257         rc = 0;
1258         if (current_mh->mh_flags != new_mh->mh_flags) {
1259                 int flags = 0;
1260                 hsm_set_cl_event(&flags, HE_STATE);
1261                 if (new_mh->mh_flags & HS_DIRTY)
1262                         hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
1263
1264                 rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
1265                                               handle);
1266         }
1267
1268         OBD_FREE_PTR(new_mh);
1269 free:
1270         OBD_FREE_PTR(current_mh);
1271         return(rc);
1272 }
1273
1274
1275 /**
1276  * The caller should guarantee to update the object ctime
1277  * after xattr_set if needed.
1278  */
1279 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1280                          const struct lu_buf *buf, const char *name,
1281                          int fl)
1282 {
1283         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1284         struct mdd_device       *mdd = mdo2mdd(obj);
1285         struct thandle          *handle;
1286         int                      rc;
1287         ENTRY;
1288
1289         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1290                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1291                 RETURN(rc);
1292         }
1293
1294         rc = mdd_xattr_sanity_check(env, mdd_obj);
1295         if (rc)
1296                 RETURN(rc);
1297
1298         handle = mdd_trans_create(env, mdd);
1299         if (IS_ERR(handle))
1300                 RETURN(PTR_ERR(handle));
1301
1302         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
1303         if (rc)
1304                 GOTO(stop, rc);
1305
1306         rc = mdd_trans_start(env, mdd, handle);
1307         if (rc)
1308                 GOTO(stop, rc);
1309
1310         /* security-replated changes may require sync */
1311         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1312                 handle->th_sync |= !!mdd->mdd_sync_permission;
1313
1314         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1315
1316         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
1317                 rc = mdd_hsm_update_locked(env, obj, buf, handle);
1318                 if (rc) {
1319                         mdd_write_unlock(env, mdd_obj);
1320                         GOTO(stop, rc);
1321                 }
1322         }
1323
1324         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1325                            mdd_object_capa(env, mdd_obj));
1326         mdd_write_unlock(env, mdd_obj);
1327         if (rc)
1328                 GOTO(stop, rc);
1329
1330         /* Only record system & user xattr changes */
1331         if (strncmp(XATTR_USER_PREFIX, name,
1332                         sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1333             strncmp(POSIX_ACL_XATTR_ACCESS, name,
1334                         sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1335             strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1336                         sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1337                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1338                                               handle);
1339
1340 stop:
1341         mdd_trans_stop(env, mdd, rc, handle);
1342
1343         RETURN(rc);
1344 }
1345
1346 static int mdd_declare_xattr_del(const struct lu_env *env,
1347                                  struct mdd_device *mdd,
1348                                  struct mdd_object *obj,
1349                                  const char *name,
1350                                  struct thandle *handle)
1351 {
1352         int rc;
1353
1354         rc = mdo_declare_xattr_del(env, obj, name, handle);
1355         if (rc)
1356                 return rc;
1357
1358         /* Only record user xattr changes */
1359         if ((strncmp("user.", name, 5) == 0))
1360                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1361
1362         return rc;
1363 }
1364
1365 /**
1366  * The caller should guarantee to update the object ctime
1367  * after xattr_set if needed.
1368  */
1369 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1370                   const char *name)
1371 {
1372         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1373         struct mdd_device *mdd = mdo2mdd(obj);
1374         struct thandle *handle;
1375         int  rc;
1376         ENTRY;
1377
1378         rc = mdd_xattr_sanity_check(env, mdd_obj);
1379         if (rc)
1380                 RETURN(rc);
1381
1382         handle = mdd_trans_create(env, mdd);
1383         if (IS_ERR(handle))
1384                 RETURN(PTR_ERR(handle));
1385
1386         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1387         if (rc)
1388                 GOTO(stop, rc);
1389
1390         rc = mdd_trans_start(env, mdd, handle);
1391         if (rc)
1392                 GOTO(stop, rc);
1393
1394         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1395         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1396                            mdd_object_capa(env, mdd_obj));
1397         mdd_write_unlock(env, mdd_obj);
1398         if (rc)
1399                 GOTO(stop, rc);
1400
1401         /* Only record system & user xattr changes */
1402         if (strncmp(XATTR_USER_PREFIX, name,
1403                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1404                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1405                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1406                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1407                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1408                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1409                                               handle);
1410
1411 stop:
1412         mdd_trans_stop(env, mdd, rc, handle);
1413
1414         RETURN(rc);
1415 }
1416
1417 /*
1418  * read lov EA of an object
1419  * return the lov EA in an allocated lu_buf
1420  */
1421 static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
1422                                      struct mdd_object *obj)
1423 {
1424         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1425         struct lu_buf   *lmm_buf = NULL;
1426         int              rc, sz;
1427         ENTRY;
1428
1429 repeat:
1430         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
1431                            mdd_object_capa(env, obj));
1432         if (rc < 0)
1433                 GOTO(out, rc);
1434
1435         if (rc == 0)
1436                 GOTO(out, rc = -ENODATA);
1437
1438         sz = rc;
1439         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1440                 /* mti_big_buf was not allocated, so we have to
1441                  * allocate it based on the ea size */
1442                 buf = mdd_buf_alloc(env, sz);
1443                 if (buf->lb_buf == NULL)
1444                         GOTO(out, rc = -ENOMEM);
1445                 goto repeat;
1446         }
1447
1448         OBD_ALLOC_PTR(lmm_buf);
1449         if (!lmm_buf)
1450                 GOTO(out, rc = -ENOMEM);
1451
1452         OBD_ALLOC(lmm_buf->lb_buf, sz);
1453         if (!lmm_buf->lb_buf)
1454                 GOTO(free, rc = -ENOMEM);
1455
1456         memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
1457         lmm_buf->lb_len = sz;
1458
1459         GOTO(out, rc = 0);
1460
1461 free:
1462         if (lmm_buf)
1463                 OBD_FREE_PTR(lmm_buf);
1464 out:
1465         if (rc)
1466                 return ERR_PTR(rc);
1467         return lmm_buf;
1468 }
1469
1470
1471 /*
1472  *  check if layout swapping between 2 objects is allowed
1473  *  the rules are:
1474  *  - same type of objects
1475  *  - same owner/group (so quotas are still valid)
1476  */
1477 static int mdd_layout_swap_allowed(const struct lu_env *env,
1478                                    struct mdd_object *o1,
1479                                    struct mdd_object *o2)
1480 {
1481         const struct lu_fid     *fid1, *fid2;
1482         __u32                    uid, gid;
1483         struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
1484         int                      rc;
1485         ENTRY;
1486
1487         fid1 = mdo2fid(o1);
1488         fid2 = mdo2fid(o2);
1489
1490         if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
1491             (mdd_object_type(o1) != mdd_object_type(o2)))
1492                 RETURN(-EPERM);
1493
1494         tmp_la->la_valid = 0;
1495         rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
1496         if (rc)
1497                 RETURN(rc);
1498         uid = tmp_la->la_uid;
1499         gid = tmp_la->la_gid;
1500
1501         tmp_la->la_valid = 0;
1502         rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
1503         if (rc)
1504                 RETURN(rc);
1505
1506         if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
1507                 RETURN(-EPERM);
1508
1509         RETURN(0);
1510 }
1511
1512 /**
1513  * swap layouts between 2 lustre objects
1514  */
1515 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1516                             struct md_object *obj2, __u64 flags)
1517 {
1518         struct mdd_object       *o1, *o2, *fst_o, *snd_o;
1519         struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
1520         struct lu_buf           *fst_buf, *snd_buf;
1521         struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
1522         struct thandle          *handle;
1523         struct mdd_device       *mdd = mdo2mdd(obj1);
1524         int                      rc;
1525         __u16                    fst_gen, snd_gen;
1526         ENTRY;
1527
1528         /* we have to sort the 2 obj, so locking will always
1529          * be in the same order, even in case of 2 concurrent swaps */
1530         rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
1531                        mdo2fid(md2mdd_obj(obj2)));
1532         /* same fid ? */
1533         if (rc == 0)
1534                 RETURN(-EPERM);
1535
1536         if (rc > 0) {
1537                 o1 = md2mdd_obj(obj1);
1538                 o2 = md2mdd_obj(obj2);
1539         } else {
1540                 o1 = md2mdd_obj(obj2);
1541                 o2 = md2mdd_obj(obj1);
1542         }
1543
1544         /* check if layout swapping is allowed */
1545         rc = mdd_layout_swap_allowed(env, o1, o2);
1546         if (rc)
1547                 RETURN(rc);
1548
1549         handle = mdd_trans_create(env, mdd);
1550         if (IS_ERR(handle))
1551                 RETURN(PTR_ERR(handle));
1552
1553         /* objects are already sorted */
1554         mdd_write_lock(env, o1, MOR_TGT_CHILD);
1555         mdd_write_lock(env, o2, MOR_TGT_CHILD);
1556
1557         lmm1_buf = mdd_get_lov_ea(env, o1);
1558         if (IS_ERR(lmm1_buf)) {
1559                 rc = PTR_ERR(lmm1_buf);
1560                 lmm1_buf = NULL;
1561                 if (rc != -ENODATA)
1562                         GOTO(unlock, rc);
1563         }
1564
1565         lmm2_buf = mdd_get_lov_ea(env, o2);
1566         if (IS_ERR(lmm2_buf)) {
1567                 rc = PTR_ERR(lmm2_buf);
1568                 lmm2_buf = NULL;
1569                 if (rc != -ENODATA)
1570                         GOTO(unlock, rc);
1571         }
1572
1573         /* swapping 2 non existant layouts is a success */
1574         if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
1575                 GOTO(unlock, rc = 0);
1576
1577         /* to help inode migration between MDT, it is better to
1578          * start by the no layout file (if one), so we order the swap */
1579         if (lmm1_buf == NULL) {
1580                 fst_o = o1;
1581                 fst_buf = lmm1_buf;
1582                 snd_o = o2;
1583                 snd_buf = lmm2_buf;
1584         } else {
1585                 fst_o = o2;
1586                 fst_buf = lmm2_buf;
1587                 snd_o = o1;
1588                 snd_buf = lmm1_buf;
1589         }
1590
1591         /* lmm and generation layout initialization */
1592         if (fst_buf) {
1593                 fst_lmm = fst_buf->lb_buf;
1594                 fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
1595         } else {
1596                 fst_lmm = NULL;
1597                 fst_gen = 0;
1598         }
1599
1600         if (snd_buf) {
1601                 snd_lmm = snd_buf->lb_buf;
1602                 snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
1603         } else {
1604                 snd_lmm = NULL;
1605                 snd_gen = 0;
1606         }
1607
1608         /* save the orignal lmm common header of first file
1609          * to be able to roll back */
1610         OBD_ALLOC_PTR(old_fst_lmm);
1611         if (old_fst_lmm == NULL)
1612                 GOTO(unlock, rc = -ENOMEM);
1613
1614         memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
1615
1616         /* increase the generation layout numbers */
1617         snd_gen++;
1618         fst_gen++;
1619
1620         /* set the file specific informations in lmm */
1621         if (fst_lmm) {
1622                 fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
1623                 fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
1624                 fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
1625         }
1626
1627         if (snd_lmm) {
1628                 snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
1629                 snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1630                 snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1631         }
1632
1633         /* prepare transaction */
1634         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
1635                                    LU_XATTR_REPLACE, handle);
1636         if (rc)
1637                 GOTO(stop, rc);
1638
1639         rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
1640                                    LU_XATTR_REPLACE, handle);
1641         if (rc)
1642                 GOTO(stop, rc);
1643
1644         rc = mdd_trans_start(env, mdd, handle);
1645         if (rc)
1646                 GOTO(stop, rc);
1647
1648         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
1649                            LU_XATTR_REPLACE, handle,
1650                            mdd_object_capa(env, fst_o));
1651         if (rc)
1652                 GOTO(stop, rc);
1653
1654         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
1655                            LU_XATTR_REPLACE, handle,
1656                            mdd_object_capa(env, snd_o));
1657         if (rc) {
1658                 int     rc2;
1659
1660                 /* failure on second file, but first was done, so we have
1661                  * to roll back first */
1662                 /* restore object_id, object_seq and generation number
1663                  * on first file */
1664                 if (fst_lmm) {
1665                         fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1666                         fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1667                         fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
1668                 }
1669
1670                 rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
1671                                     LU_XATTR_REPLACE, handle,
1672                                     mdd_object_capa(env, fst_o));
1673                 if (rc2) {
1674                         /* very bad day */
1675                         CERROR("%s: unable to roll back after swap layouts"
1676                                " failure between "DFID" and "DFID
1677                                " rc2 = %d rc = %d)\n",
1678                                mdd2obd_dev(mdd)->obd_name,
1679                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
1680                                rc2, rc);
1681                         /* a solution to avoid journal commit is to panic,
1682                          * but it has strong consequences so we use LBUG to
1683                          * allow sysdamin to choose to panic or not
1684                          */
1685                         LBUG();
1686                 }
1687                 GOTO(stop, rc);
1688         }
1689         EXIT;
1690
1691 stop:
1692         mdd_trans_stop(env, mdd, rc, handle);
1693 unlock:
1694         mdd_write_unlock(env, o2);
1695         mdd_write_unlock(env, o1);
1696
1697         if (lmm1_buf && lmm1_buf->lb_buf)
1698                 OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
1699         if (lmm1_buf)
1700                 OBD_FREE_PTR(lmm1_buf);
1701
1702         if (lmm2_buf && lmm2_buf->lb_buf)
1703                 OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
1704         if (lmm2_buf)
1705                 OBD_FREE_PTR(lmm2_buf);
1706
1707         if (old_fst_lmm)
1708                 OBD_FREE_PTR(old_fst_lmm);
1709
1710         return rc;
1711 }
1712
1713 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1714                 struct mdd_object *child, struct lu_attr *attr)
1715 {
1716         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1717         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1718         struct dt_object *nc = mdd_object_child(child);
1719
1720         /* @hint will be initialized by underlying device. */
1721         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1722 }
1723
1724 /*
1725  * do NOT or the MAY_*'s, you'll get the weakest
1726  */
1727 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1728 {
1729         int res = 0;
1730
1731         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1732          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1733          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1734          * owner can write to a file even if it is marked readonly to hide
1735          * its brokenness. (bug 5781) */
1736         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1737                 struct lu_ucred *uc = lu_ucred_check(env);
1738
1739                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1740                         return 0;
1741         }
1742
1743         if (flags & FMODE_READ)
1744                 res |= MAY_READ;
1745         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1746                 res |= MAY_WRITE;
1747         if (flags & MDS_FMODE_EXEC)
1748                 res = MAY_EXEC;
1749         return res;
1750 }
1751
1752 static int mdd_open_sanity_check(const struct lu_env *env,
1753                                  struct mdd_object *obj, int flag)
1754 {
1755         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1756         int mode, rc;
1757         ENTRY;
1758
1759         /* EEXIST check */
1760         if (mdd_is_dead_obj(obj))
1761                 RETURN(-ENOENT);
1762
1763         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1764         if (rc)
1765                RETURN(rc);
1766
1767         if (S_ISLNK(tmp_la->la_mode))
1768                 RETURN(-ELOOP);
1769
1770         mode = accmode(env, tmp_la, flag);
1771
1772         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1773                 RETURN(-EISDIR);
1774
1775         if (!(flag & MDS_OPEN_CREATED)) {
1776                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1777                 if (rc)
1778                         RETURN(rc);
1779         }
1780
1781         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1782             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1783                 flag &= ~MDS_OPEN_TRUNC;
1784
1785         /* For writing append-only file must open it with append mode. */
1786         if (mdd_is_append(obj)) {
1787                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1788                         RETURN(-EPERM);
1789                 if (flag & MDS_OPEN_TRUNC)
1790                         RETURN(-EPERM);
1791         }
1792
1793 #if 0
1794         /*
1795          * Now, flag -- O_NOATIME does not be packed by client.
1796          */
1797         if (flag & O_NOATIME) {
1798                 struct lu_ucred *uc = lu_ucred(env);
1799
1800                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1801                            (uc->uc_valid == UCRED_NEW)) &&
1802                     (uc->uc_fsuid != tmp_la->la_uid) &&
1803                     !mdd_capable(uc, CFS_CAP_FOWNER))
1804                         RETURN(-EPERM);
1805         }
1806 #endif
1807
1808         RETURN(0);
1809 }
1810
1811 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1812                     int flags)
1813 {
1814         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1815         int rc = 0;
1816
1817         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1818
1819         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1820         if (rc == 0)
1821                 mdd_obj->mod_count++;
1822
1823         mdd_write_unlock(env, mdd_obj);
1824         return rc;
1825 }
1826
1827 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1828                             struct md_attr *ma, struct thandle *handle)
1829 {
1830         return mdo_declare_destroy(env, obj, handle);
1831 }
1832
1833 /* return md_attr back,
1834  * if it is last unlink then return lov ea + llog cookie*/
1835 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1836                     struct md_attr *ma, struct thandle *handle)
1837 {
1838         int rc;
1839         ENTRY;
1840
1841         rc = mdo_destroy(env, obj, handle);
1842
1843         RETURN(rc);
1844 }
1845
1846 static int mdd_declare_close(const struct lu_env *env,
1847                              struct mdd_object *obj,
1848                              struct md_attr *ma,
1849                              struct thandle *handle)
1850 {
1851         int rc;
1852
1853         rc = orph_declare_index_delete(env, obj, handle);
1854         if (rc)
1855                 return rc;
1856
1857         return mdo_declare_destroy(env, obj, handle);
1858 }
1859
1860 /*
1861  * No permission check is needed.
1862  */
1863 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1864                      struct md_attr *ma, int mode)
1865 {
1866         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1867         struct mdd_device *mdd = mdo2mdd(obj);
1868         struct thandle    *handle = NULL;
1869         int rc, is_orphan = 0;
1870         ENTRY;
1871
1872         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1873                 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1874                 mdd_obj->mod_count--;
1875                 mdd_write_unlock(env, mdd_obj);
1876
1877                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1878                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1879                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1880                 RETURN(0);
1881         }
1882
1883         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
1884          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
1885         /* check without any lock */
1886         is_orphan = mdd_obj->mod_count == 1 &&
1887                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
1888
1889 again:
1890         if (is_orphan) {
1891                 handle = mdd_trans_create(env, mdo2mdd(obj));
1892                 if (IS_ERR(handle))
1893                         RETURN(PTR_ERR(handle));
1894
1895                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1896                 if (rc)
1897                         GOTO(stop, rc);
1898
1899                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1900                 if (rc)
1901                         GOTO(stop, rc);
1902
1903                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1904                 if (rc)
1905                         GOTO(stop, rc);
1906         }
1907
1908         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1909         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1910                         mdd_object_capa(env, mdd_obj));
1911         if (rc != 0) {
1912                 CERROR("Failed to get lu_attr of "DFID": %d\n",
1913                        PFID(mdd_object_fid(mdd_obj)), rc);
1914                 GOTO(out, rc);
1915         }
1916
1917         /* check again with lock */
1918         is_orphan = (mdd_obj->mod_count == 1) &&
1919                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
1920                      ma->ma_attr.la_nlink == 0);
1921
1922         if (is_orphan && handle == NULL) {
1923                 mdd_write_unlock(env, mdd_obj);
1924                 goto again;
1925         }
1926
1927         mdd_obj->mod_count--; /*release open count */
1928
1929         if (!is_orphan)
1930                 GOTO(out, rc = 0);
1931
1932         /* Orphan object */
1933         /* NB: Object maybe not in orphan list originally, it is rare case for
1934          * mdd_finish_unlink() failure, in that case, the object doesn't have
1935          * ORPHAN_OBJ flag */
1936         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1937                 /* remove link to object from orphan index */
1938                 LASSERT(handle != NULL);
1939                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1940                 if (rc != 0) {
1941                         CERROR("%s: unable to delete "DFID" from orphan list: "
1942                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1943                                PFID(mdd_object_fid(mdd_obj)), rc);
1944                         /* If object was not deleted from orphan list, do not
1945                          * destroy OSS objects, which will be done when next
1946                          * recovery. */
1947                         GOTO(out, rc);
1948                 }
1949
1950                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1951                        "list, OSS objects to be destroyed.\n",
1952                        PFID(mdd_object_fid(mdd_obj)));
1953         }
1954
1955         rc = mdo_destroy(env, mdd_obj, handle);
1956
1957         if (rc != 0) {
1958                 CERROR("%s: unable to delete "DFID" from orphan list: "
1959                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1960                        PFID(mdd_object_fid(mdd_obj)), rc);
1961         }
1962         EXIT;
1963
1964 out:
1965         mdd_write_unlock(env, mdd_obj);
1966
1967         if (rc == 0 &&
1968             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1969             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1970                 if (handle == NULL) {
1971                         handle = mdd_trans_create(env, mdo2mdd(obj));
1972                         if (IS_ERR(handle))
1973                                 GOTO(stop, rc = IS_ERR(handle));
1974
1975                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1976                                                          handle);
1977                         if (rc)
1978                                 GOTO(stop, rc);
1979
1980                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1981                         if (rc)
1982                                 GOTO(stop, rc);
1983                 }
1984
1985                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1986                                          mdd_obj, handle);
1987         }
1988
1989 stop:
1990         if (handle != NULL)
1991                 mdd_trans_stop(env, mdd, rc, handle);
1992         return rc;
1993 }
1994
1995 /*
1996  * Permission check is done when open,
1997  * no need check again.
1998  */
1999 static int mdd_readpage_sanity_check(const struct lu_env *env,
2000                                      struct mdd_object *obj)
2001 {
2002         struct dt_object *next = mdd_object_child(obj);
2003         int rc;
2004         ENTRY;
2005
2006         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2007                 rc = 0;
2008         else
2009                 rc = -ENOTDIR;
2010
2011         RETURN(rc);
2012 }
2013
2014 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2015                               int nob, const struct dt_it_ops *iops,
2016                               struct dt_it *it, __u32 attr, void *arg)
2017 {
2018         struct lu_dirpage       *dp = &lp->lp_dir;
2019         void                    *area = dp;
2020         int                      result;
2021         __u64                    hash = 0;
2022         struct lu_dirent        *ent;
2023         struct lu_dirent        *last = NULL;
2024         int                      first = 1;
2025
2026         memset(area, 0, sizeof (*dp));
2027         area += sizeof (*dp);
2028         nob  -= sizeof (*dp);
2029
2030         ent  = area;
2031         do {
2032                 int    len;
2033                 int    recsize;
2034
2035                 len = iops->key_size(env, it);
2036
2037                 /* IAM iterator can return record with zero len. */
2038                 if (len == 0)
2039                         goto next;
2040
2041                 hash = iops->store(env, it);
2042                 if (unlikely(first)) {
2043                         first = 0;
2044                         dp->ldp_hash_start = cpu_to_le64(hash);
2045                 }
2046
2047                 /* calculate max space required for lu_dirent */
2048                 recsize = lu_dirent_calc_size(len, attr);
2049
2050                 if (nob >= recsize) {
2051                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2052                         if (result == -ESTALE)
2053                                 goto next;
2054                         if (result != 0)
2055                                 goto out;
2056
2057                         /* osd might not able to pack all attributes,
2058                          * so recheck rec length */
2059                         recsize = le16_to_cpu(ent->lde_reclen);
2060                 } else {
2061                         result = (last != NULL) ? 0 :-EINVAL;
2062                         goto out;
2063                 }
2064                 last = ent;
2065                 ent = (void *)ent + recsize;
2066                 nob -= recsize;
2067
2068 next:
2069                 result = iops->next(env, it);
2070                 if (result == -ESTALE)
2071                         goto next;
2072         } while (result == 0);
2073
2074 out:
2075         dp->ldp_hash_end = cpu_to_le64(hash);
2076         if (last != NULL) {
2077                 if (last->lde_hash == dp->ldp_hash_end)
2078                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2079                 last->lde_reclen = 0; /* end mark */
2080         }
2081         if (result > 0)
2082                 /* end of directory */
2083                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2084         else if (result < 0)
2085                 CWARN("build page failed: %d!\n", result);
2086         return result;
2087 }
2088
2089 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2090                  const struct lu_rdpg *rdpg)
2091 {
2092         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2093         int rc;
2094         ENTRY;
2095
2096         if (mdd_object_exists(mdd_obj) == 0) {
2097                 CERROR("%s: object "DFID" not found: rc = -2\n",
2098                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2099                 return -ENOENT;
2100         }
2101
2102         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2103         rc = mdd_readpage_sanity_check(env, mdd_obj);
2104         if (rc)
2105                 GOTO(out_unlock, rc);
2106
2107         if (mdd_is_dead_obj(mdd_obj)) {
2108                 struct page *pg;
2109                 struct lu_dirpage *dp;
2110
2111                 /*
2112                  * According to POSIX, please do not return any entry to client:
2113                  * even dot and dotdot should not be returned.
2114                  */
2115                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2116                        PFID(mdd_object_fid(mdd_obj)));
2117
2118                 if (rdpg->rp_count <= 0)
2119                         GOTO(out_unlock, rc = -EFAULT);
2120                 LASSERT(rdpg->rp_pages != NULL);
2121
2122                 pg = rdpg->rp_pages[0];
2123                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2124                 memset(dp, 0 , sizeof(struct lu_dirpage));
2125                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2126                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2127                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2128                 cfs_kunmap(pg);
2129                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2130         }
2131
2132         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2133                            mdd_dir_page_build, NULL);
2134         if (rc >= 0) {
2135                 struct lu_dirpage       *dp;
2136
2137                 dp = cfs_kmap(rdpg->rp_pages[0]);
2138                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2139                 if (rc == 0) {
2140                         /*
2141                          * No pages were processed, mark this for first page
2142                          * and send back.
2143                          */
2144                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2145                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2146                 }
2147                 cfs_kunmap(rdpg->rp_pages[0]);
2148         }
2149
2150         GOTO(out_unlock, rc);
2151 out_unlock:
2152         mdd_read_unlock(env, mdd_obj);
2153         return rc;
2154 }
2155
2156 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2157 {
2158         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2159
2160         if (mdd_object_exists(mdd_obj) == 0) {
2161                 CERROR("%s: object "DFID" not found: rc = -2\n",
2162                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2163                 return -ENOENT;
2164         }
2165         return dt_object_sync(env, mdd_object_child(mdd_obj));
2166 }
2167
2168 const struct md_object_operations mdd_obj_ops = {
2169         .moo_permission         = mdd_permission,
2170         .moo_attr_get           = mdd_attr_get,
2171         .moo_attr_set           = mdd_attr_set,
2172         .moo_xattr_get          = mdd_xattr_get,
2173         .moo_xattr_set          = mdd_xattr_set,
2174         .moo_xattr_list         = mdd_xattr_list,
2175         .moo_xattr_del          = mdd_xattr_del,
2176         .moo_swap_layouts       = mdd_swap_layouts,
2177         .moo_open               = mdd_open,
2178         .moo_close              = mdd_close,
2179         .moo_readpage           = mdd_readpage,
2180         .moo_readlink           = mdd_readlink,
2181         .moo_changelog          = mdd_changelog,
2182         .moo_capa_get           = mdd_capa_get,
2183         .moo_object_sync        = mdd_object_sync,
2184         .moo_path               = mdd_path,
2185 };