Whamcloud - gitweb
973f893ee83f74e2b31fa49e4f94a52da8f38b7e
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         *buf = LU_BUF_NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 *buf = LU_BUF_NULL;
146         }
147         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         *buf = LU_BUF_NULL;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct lu_object *mdd_object_alloc(const struct lu_env *env,
183                                    const struct lu_object_header *hdr,
184                                    struct lu_device *d)
185 {
186         struct mdd_object *mdd_obj;
187
188         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
189         if (mdd_obj != NULL) {
190                 struct lu_object *o;
191
192                 o = mdd2lu_obj(mdd_obj);
193                 lu_object_init(o, NULL, d);
194                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
195                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
196                 mdd_obj->mod_count = 0;
197                 o->lo_ops = &mdd_lu_obj_ops;
198                 return o;
199         } else {
200                 return NULL;
201         }
202 }
203
204 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
205                            const struct lu_object_conf *unused)
206 {
207         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
208         struct mdd_object *mdd_obj = lu2mdd_obj(o);
209         struct lu_object  *below;
210         struct lu_device  *under;
211         ENTRY;
212
213         mdd_obj->mod_cltime = 0;
214         under = &d->mdd_child->dd_lu_dev;
215         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
216         mdd_pdlock_init(mdd_obj);
217         if (IS_ERR(below))
218                 RETURN(PTR_ERR(below));
219
220         lu_object_add(o, below);
221
222         RETURN(0);
223 }
224
225 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
226 {
227         if (lu_object_exists(o))
228                 return mdd_get_flags(env, lu2mdd_obj(o));
229         else
230                 return 0;
231 }
232
233 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
234 {
235         struct mdd_object *mdd = lu2mdd_obj(o);
236
237         lu_object_fini(o);
238         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
239 }
240
241 static int mdd_object_print(const struct lu_env *env, void *cookie,
242                             lu_printer_t p, const struct lu_object *o)
243 {
244         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
245         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
246                     "valid=%x, cltime="LPU64", flags=%lx)",
247                     mdd, mdd->mod_count, mdd->mod_valid,
248                     mdd->mod_cltime, mdd->mod_flags);
249 }
250
251 static const struct lu_object_operations mdd_lu_obj_ops = {
252         .loo_object_init    = mdd_object_init,
253         .loo_object_start   = mdd_object_start,
254         .loo_object_free    = mdd_object_free,
255         .loo_object_print   = mdd_object_print,
256 };
257
258 struct mdd_object *mdd_object_find(const struct lu_env *env,
259                                    struct mdd_device *d,
260                                    const struct lu_fid *f)
261 {
262         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
263 }
264
265 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
266                         const char *path, struct lu_fid *fid)
267 {
268         struct lu_buf *buf;
269         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
270         struct mdd_object *obj;
271         struct lu_name *lname = &mdd_env_info(env)->mti_name;
272         char *name;
273         int rc = 0;
274         ENTRY;
275
276         /* temp buffer for path element */
277         buf = mdd_buf_alloc(env, PATH_MAX);
278         if (buf->lb_buf == NULL)
279                 RETURN(-ENOMEM);
280
281         lname->ln_name = name = buf->lb_buf;
282         lname->ln_namelen = 0;
283         *f = mdd->mdd_root_fid;
284
285         while(1) {
286                 while (*path == '/')
287                         path++;
288                 if (*path == '\0')
289                         break;
290                 while (*path != '/' && *path != '\0') {
291                         *name = *path;
292                         path++;
293                         name++;
294                         lname->ln_namelen++;
295                 }
296
297                 *name = '\0';
298                 /* find obj corresponding to fid */
299                 obj = mdd_object_find(env, mdd, f);
300                 if (obj == NULL)
301                         GOTO(out, rc = -EREMOTE);
302                 if (IS_ERR(obj))
303                         GOTO(out, rc = PTR_ERR(obj));
304                 /* get child fid from parent and name */
305                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
306                 mdd_object_put(env, obj);
307                 if (rc)
308                         break;
309
310                 name = buf->lb_buf;
311                 lname->ln_namelen = 0;
312         }
313
314         if (!rc)
315                 *fid = *f;
316 out:
317         RETURN(rc);
318 }
319
320 /** The maximum depth that fid2path() will search.
321  * This is limited only because we want to store the fids for
322  * historical path lookup purposes.
323  */
324 #define MAX_PATH_DEPTH 100
325
326 /** mdd_path() lookup structure. */
327 struct path_lookup_info {
328         __u64                pli_recno;        /**< history point */
329         __u64                pli_currec;       /**< current record */
330         struct lu_fid        pli_fid;
331         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
332         struct mdd_object   *pli_mdd_obj;
333         char                *pli_path;         /**< full path */
334         int                  pli_pathlen;
335         int                  pli_linkno;       /**< which hardlink to follow */
336         int                  pli_fidcount;     /**< number of \a pli_fids */
337 };
338
339 static int mdd_path_current(const struct lu_env *env,
340                             struct path_lookup_info *pli)
341 {
342         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
343         struct mdd_object *mdd_obj;
344         struct lu_buf     *buf = NULL;
345         struct link_ea_header *leh;
346         struct link_ea_entry  *lee;
347         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
348         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
349         char *ptr;
350         int reclen;
351         int rc;
352         ENTRY;
353
354         ptr = pli->pli_path + pli->pli_pathlen - 1;
355         *ptr = 0;
356         --ptr;
357         pli->pli_fidcount = 0;
358         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
359
360         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
361                 mdd_obj = mdd_object_find(env, mdd,
362                                           &pli->pli_fids[pli->pli_fidcount]);
363                 if (mdd_obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(mdd_obj))
366                         GOTO(out, rc = PTR_ERR(mdd_obj));
367                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
368                 if (rc <= 0) {
369                         mdd_object_put(env, mdd_obj);
370                         if (rc == -1)
371                                 rc = -EREMOTE;
372                         else if (rc == 0)
373                                 /* Do I need to error out here? */
374                                 rc = -ENOENT;
375                         GOTO(out, rc);
376                 }
377
378                 /* Get parent fid and object name */
379                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
380                 buf = mdd_links_get(env, mdd_obj);
381                 mdd_read_unlock(env, mdd_obj);
382                 mdd_object_put(env, mdd_obj);
383                 if (IS_ERR(buf))
384                         GOTO(out, rc = PTR_ERR(buf));
385
386                 leh = buf->lb_buf;
387                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
388                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
389
390                 /* If set, use link #linkno for path lookup, otherwise use
391                    link #0.  Only do this for the final path element. */
392                 if ((pli->pli_fidcount == 0) &&
393                     (pli->pli_linkno < leh->leh_reccount)) {
394                         int count;
395                         for (count = 0; count < pli->pli_linkno; count++) {
396                                 lee = (struct link_ea_entry *)
397                                      ((char *)lee + reclen);
398                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
399                         }
400                         if (pli->pli_linkno < leh->leh_reccount - 1)
401                                 /* indicate to user there are more links */
402                                 pli->pli_linkno++;
403                 }
404
405                 /* Pack the name in the end of the buffer */
406                 ptr -= tmpname->ln_namelen;
407                 if (ptr - 1 <= pli->pli_path)
408                         GOTO(out, rc = -EOVERFLOW);
409                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
410                 *(--ptr) = '/';
411
412                 /* Store the parent fid for historic lookup */
413                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
414                         GOTO(out, rc = -EOVERFLOW);
415                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
416         }
417
418         /* Verify that our path hasn't changed since we started the lookup.
419            Record the current index, and verify the path resolves to the
420            same fid. If it does, then the path is correct as of this index. */
421         spin_lock(&mdd->mdd_cl.mc_lock);
422         pli->pli_currec = mdd->mdd_cl.mc_index;
423         spin_unlock(&mdd->mdd_cl.mc_lock);
424         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
425         if (rc) {
426                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
427                 GOTO (out, rc = -EAGAIN);
428         }
429         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
430                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
431                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
432                        PFID(&pli->pli_fid));
433                 GOTO(out, rc = -EAGAIN);
434         }
435         ptr++; /* skip leading / */
436         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
437
438         EXIT;
439 out:
440         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
441                 /* if we vmalloced a large buffer drop it */
442                 mdd_buf_put(buf);
443
444         return rc;
445 }
446
447 static int mdd_path_historic(const struct lu_env *env,
448                              struct path_lookup_info *pli)
449 {
450         return 0;
451 }
452
453 /* Returns the full path to this fid, as of changelog record recno. */
454 static int mdd_path(const struct lu_env *env, struct md_object *obj,
455                     char *path, int pathlen, __u64 *recno, int *linkno)
456 {
457         struct path_lookup_info *pli;
458         int tries = 3;
459         int rc = -EAGAIN;
460         ENTRY;
461
462         if (pathlen < 3)
463                 RETURN(-EOVERFLOW);
464
465         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
466                 path[0] = '\0';
467                 RETURN(0);
468         }
469
470         OBD_ALLOC_PTR(pli);
471         if (pli == NULL)
472                 RETURN(-ENOMEM);
473
474         pli->pli_mdd_obj = md2mdd_obj(obj);
475         pli->pli_recno = *recno;
476         pli->pli_path = path;
477         pli->pli_pathlen = pathlen;
478         pli->pli_linkno = *linkno;
479
480         /* Retry multiple times in case file is being moved */
481         while (tries-- && rc == -EAGAIN)
482                 rc = mdd_path_current(env, pli);
483
484         /* For historical path lookup, the current links may not have existed
485          * at "recno" time.  We must switch over to earlier links/parents
486          * by using the changelog records.  If the earlier parent doesn't
487          * exist, we must search back through the changelog to reconstruct
488          * its parents, then check if it exists, etc.
489          * We may ignore this problem for the initial implementation and
490          * state that an "original" hardlink must still exist for us to find
491          * historic path name. */
492         if (pli->pli_recno != -1) {
493                 rc = mdd_path_historic(env, pli);
494         } else {
495                 *recno = pli->pli_currec;
496                 /* Return next link index to caller */
497                 *linkno = pli->pli_linkno;
498         }
499
500         OBD_FREE_PTR(pli);
501
502         RETURN (rc);
503 }
504
505 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
506 {
507         struct lu_attr *la = &mdd_env_info(env)->mti_la;
508         int rc;
509
510         ENTRY;
511         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
512         if (rc == 0) {
513                 mdd_flags_xlate(obj, la->la_flags);
514         }
515         RETURN(rc);
516 }
517
518 /*
519  * No permission check is needed.
520  */
521 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
522                  struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526
527         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
528                           mdd_object_capa(env, md2mdd_obj(obj)));
529         RETURN(rc);
530 }
531
532 /*
533  * No permission check is needed.
534  */
535 static int mdd_xattr_get(const struct lu_env *env,
536                          struct md_object *obj, struct lu_buf *buf,
537                          const char *name)
538 {
539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
540         int rc;
541
542         ENTRY;
543
544         if (mdd_object_exists(mdd_obj) == 0) {
545                 CERROR("%s: object "DFID" not found: rc = -2\n",
546                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
547                 return -ENOENT;
548         }
549
550         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
551         rc = mdo_xattr_get(env, mdd_obj, buf, name,
552                            mdd_object_capa(env, mdd_obj));
553         mdd_read_unlock(env, mdd_obj);
554
555         RETURN(rc);
556 }
557
558 /*
559  * Permission check is done when open,
560  * no need check again.
561  */
562 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
563                         struct lu_buf *buf)
564 {
565         struct mdd_object *mdd_obj = md2mdd_obj(obj);
566         struct dt_object  *next;
567         loff_t             pos = 0;
568         int                rc;
569         ENTRY;
570
571         if (mdd_object_exists(mdd_obj) == 0) {
572                 CERROR("%s: object "DFID" not found: rc = -2\n",
573                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
574                 return -ENOENT;
575         }
576
577         next = mdd_object_child(mdd_obj);
578         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
579         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
580                                          mdd_object_capa(env, mdd_obj));
581         mdd_read_unlock(env, mdd_obj);
582         RETURN(rc);
583 }
584
585 /*
586  * No permission check is needed.
587  */
588 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
589                           struct lu_buf *buf)
590 {
591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
592         int rc;
593
594         ENTRY;
595
596         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
597         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
598         mdd_read_unlock(env, mdd_obj);
599
600         RETURN(rc);
601 }
602
603 int mdd_declare_object_create_internal(const struct lu_env *env,
604                                        struct mdd_object *p,
605                                        struct mdd_object *c,
606                                        struct lu_attr *attr,
607                                        struct thandle *handle,
608                                        const struct md_op_spec *spec)
609 {
610         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
611         const struct dt_index_features *feat = spec->sp_feat;
612         int rc;
613         ENTRY;
614
615         if (feat != &dt_directory_features && feat != NULL) {
616                 dof->dof_type = DFT_INDEX;
617                 dof->u.dof_idx.di_feat = feat;
618
619         } else {
620                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
621                 if (dof->dof_type == DFT_REGULAR) {
622                         dof->u.dof_reg.striped =
623                                 md_should_create(spec->sp_cr_flags);
624                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
625                                 dof->u.dof_reg.striped = 0;
626                         /* is this replay? */
627                         if (spec->no_create)
628                                 dof->u.dof_reg.striped = 0;
629                 }
630         }
631
632         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
633
634         RETURN(rc);
635 }
636
637 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
638                                struct mdd_object *c, struct lu_attr *attr,
639                                struct thandle *handle,
640                                const struct md_op_spec *spec)
641 {
642         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
643         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
644         int rc;
645         ENTRY;
646
647         LASSERT(!mdd_object_exists(c));
648
649         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
650
651         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
652
653         RETURN(rc);
654 }
655
656 /**
657  * Make sure the ctime is increased only.
658  */
659 static inline int mdd_attr_check(const struct lu_env *env,
660                                  struct mdd_object *obj,
661                                  struct lu_attr *attr)
662 {
663         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
664         int rc;
665         ENTRY;
666
667         if (attr->la_valid & LA_CTIME) {
668                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
669                 if (rc)
670                         RETURN(rc);
671
672                 if (attr->la_ctime < tmp_la->la_ctime)
673                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
674                 else if (attr->la_valid == LA_CTIME &&
675                          attr->la_ctime == tmp_la->la_ctime)
676                         attr->la_valid &= ~LA_CTIME;
677         }
678         RETURN(0);
679 }
680
681 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
682                           struct lu_attr *attr, struct thandle *handle,
683                           int needacl)
684 {
685         int rc;
686         ENTRY;
687
688         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
689 #ifdef CONFIG_FS_POSIX_ACL
690         if (!rc && (attr->la_valid & LA_MODE) && needacl)
691                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
692 #endif
693         RETURN(rc);
694 }
695
696 int mdd_attr_check_set_internal(const struct lu_env *env,
697                                 struct mdd_object *obj, struct lu_attr *attr,
698                                 struct thandle *handle, int needacl)
699 {
700         int rc;
701         ENTRY;
702
703         rc = mdd_attr_check(env, obj, attr);
704         if (rc)
705                 RETURN(rc);
706
707         if (attr->la_valid)
708                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
709         RETURN(rc);
710 }
711
712 /*
713  * This gives the same functionality as the code between
714  * sys_chmod and inode_setattr
715  * chown_common and inode_setattr
716  * utimes and inode_setattr
717  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
718  */
719 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
720                         struct lu_attr *la, const unsigned long flags)
721 {
722         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
723         struct lu_ucred  *uc;
724         int               rc;
725         ENTRY;
726
727         if (!la->la_valid)
728                 RETURN(0);
729
730         /* Do not permit change file type */
731         if (la->la_valid & LA_TYPE)
732                 RETURN(-EPERM);
733
734         /* They should not be processed by setattr */
735         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
736                 RETURN(-EPERM);
737
738         /* export destroy does not have ->le_ses, but we may want
739          * to drop LUSTRE_SOM_FL. */
740         uc = lu_ucred_check(env);
741         if (uc == NULL)
742                 RETURN(0);
743
744         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
745         if (rc)
746                 RETURN(rc);
747
748         if (la->la_valid == LA_CTIME) {
749                 if (!(flags & MDS_PERM_BYPASS))
750                         /* This is only for set ctime when rename's source is
751                          * on remote MDS. */
752                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
753                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
754                         la->la_valid &= ~LA_CTIME;
755                 RETURN(rc);
756         }
757
758         if (la->la_valid == LA_ATIME) {
759                 /* This is atime only set for read atime update on close. */
760                 if (la->la_atime >= tmp_la->la_atime &&
761                     la->la_atime < (tmp_la->la_atime +
762                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
763                         la->la_valid &= ~LA_ATIME;
764                 RETURN(0);
765         }
766
767         /* Check if flags change. */
768         if (la->la_valid & LA_FLAGS) {
769                 unsigned int oldflags = 0;
770                 unsigned int newflags = la->la_flags &
771                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
772
773                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
774                     !mdd_capable(uc, CFS_CAP_FOWNER))
775                         RETURN(-EPERM);
776
777                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
778                  * only be changed by the relevant capability. */
779                 if (mdd_is_immutable(obj))
780                         oldflags |= LUSTRE_IMMUTABLE_FL;
781                 if (mdd_is_append(obj))
782                         oldflags |= LUSTRE_APPEND_FL;
783                 if ((oldflags ^ newflags) &&
784                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
785                         RETURN(-EPERM);
786
787                 if (!S_ISDIR(tmp_la->la_mode))
788                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
789         }
790
791         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
792             (la->la_valid & ~LA_FLAGS) &&
793             !(flags & MDS_PERM_BYPASS))
794                 RETURN(-EPERM);
795
796         /* Check for setting the obj time. */
797         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
798             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
799                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
800                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
801                         rc = mdd_permission_internal(env, obj, tmp_la,
802                                                      MAY_WRITE);
803                         if (rc)
804                                 RETURN(rc);
805                 }
806         }
807
808         if (la->la_valid & LA_KILL_SUID) {
809                 la->la_valid &= ~LA_KILL_SUID;
810                 if ((tmp_la->la_mode & S_ISUID) &&
811                     !(la->la_valid & LA_MODE)) {
812                         la->la_mode = tmp_la->la_mode;
813                         la->la_valid |= LA_MODE;
814                 }
815                 la->la_mode &= ~S_ISUID;
816         }
817
818         if (la->la_valid & LA_KILL_SGID) {
819                 la->la_valid &= ~LA_KILL_SGID;
820                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
821                                         (S_ISGID | S_IXGRP)) &&
822                     !(la->la_valid & LA_MODE)) {
823                         la->la_mode = tmp_la->la_mode;
824                         la->la_valid |= LA_MODE;
825                 }
826                 la->la_mode &= ~S_ISGID;
827         }
828
829         /* Make sure a caller can chmod. */
830         if (la->la_valid & LA_MODE) {
831                 if (!(flags & MDS_PERM_BYPASS) &&
832                     (uc->uc_fsuid != tmp_la->la_uid) &&
833                     !mdd_capable(uc, CFS_CAP_FOWNER))
834                         RETURN(-EPERM);
835
836                 if (la->la_mode == (cfs_umode_t) -1)
837                         la->la_mode = tmp_la->la_mode;
838                 else
839                         la->la_mode = (la->la_mode & S_IALLUGO) |
840                                       (tmp_la->la_mode & ~S_IALLUGO);
841
842                 /* Also check the setgid bit! */
843                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
844                                        la->la_gid : tmp_la->la_gid) &&
845                     !mdd_capable(uc, CFS_CAP_FSETID))
846                         la->la_mode &= ~S_ISGID;
847         } else {
848                la->la_mode = tmp_la->la_mode;
849         }
850
851         /* Make sure a caller can chown. */
852         if (la->la_valid & LA_UID) {
853                 if (la->la_uid == (uid_t) -1)
854                         la->la_uid = tmp_la->la_uid;
855                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
856                      (la->la_uid != tmp_la->la_uid)) &&
857                     !mdd_capable(uc, CFS_CAP_CHOWN))
858                         RETURN(-EPERM);
859
860                 /* If the user or group of a non-directory has been
861                  * changed by a non-root user, remove the setuid bit.
862                  * 19981026 David C Niemi <niemi@tux.org>
863                  *
864                  * Changed this to apply to all users, including root,
865                  * to avoid some races. This is the behavior we had in
866                  * 2.0. The check for non-root was definitely wrong
867                  * for 2.2 anyway, as it should have been using
868                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
869                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
870                     !S_ISDIR(tmp_la->la_mode)) {
871                         la->la_mode &= ~S_ISUID;
872                         la->la_valid |= LA_MODE;
873                 }
874         }
875
876         /* Make sure caller can chgrp. */
877         if (la->la_valid & LA_GID) {
878                 if (la->la_gid == (gid_t) -1)
879                         la->la_gid = tmp_la->la_gid;
880                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
881                      ((la->la_gid != tmp_la->la_gid) &&
882                       !lustre_in_group_p(uc, la->la_gid))) &&
883                     !mdd_capable(uc, CFS_CAP_CHOWN))
884                         RETURN(-EPERM);
885
886                 /* Likewise, if the user or group of a non-directory
887                  * has been changed by a non-root user, remove the
888                  * setgid bit UNLESS there is no group execute bit
889                  * (this would be a file marked for mandatory
890                  * locking).  19981026 David C Niemi <niemi@tux.org>
891                  *
892                  * Removed the fsuid check (see the comment above) --
893                  * 19990830 SD. */
894                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
895                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
896                         la->la_mode &= ~S_ISGID;
897                         la->la_valid |= LA_MODE;
898                 }
899         }
900
901         /* For both Size-on-MDS case and truncate case,
902          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
903          * We distinguish them by "flags & MDS_SOM".
904          * For SOM case, it is true, the MAY_WRITE perm has been checked
905          * when open, no need check again. For truncate case, it is false,
906          * the MAY_WRITE perm should be checked here. */
907         if (flags & MDS_SOM) {
908                 /* For the "Size-on-MDS" setattr update, merge coming
909                  * attributes with the set in the inode. BUG 10641 */
910                 if ((la->la_valid & LA_ATIME) &&
911                     (la->la_atime <= tmp_la->la_atime))
912                         la->la_valid &= ~LA_ATIME;
913
914                 /* OST attributes do not have a priority over MDS attributes,
915                  * so drop times if ctime is equal. */
916                 if ((la->la_valid & LA_CTIME) &&
917                     (la->la_ctime <= tmp_la->la_ctime))
918                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
919         } else {
920                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
921                         if (!((flags & MDS_OWNEROVERRIDE) &&
922                               (uc->uc_fsuid == tmp_la->la_uid)) &&
923                             !(flags & MDS_PERM_BYPASS)) {
924                                 rc = mdd_permission_internal(env, obj,
925                                                              tmp_la, MAY_WRITE);
926                                 if (rc != 0)
927                                         RETURN(rc);
928                         }
929                 }
930                 if (la->la_valid & LA_CTIME) {
931                         /* The pure setattr, it has the priority over what is
932                          * already set, do not drop it if ctime is equal. */
933                         if (la->la_ctime < tmp_la->la_ctime)
934                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
935                                                   LA_CTIME);
936                 }
937         }
938
939         RETURN(0);
940 }
941
942 /** Store a data change changelog record
943  * If this fails, we must fail the whole transaction; we don't
944  * want the change to commit without the log entry.
945  * \param mdd_obj - mdd_object of change
946  * \param handle - transacion handle
947  */
948 static int mdd_changelog_data_store(const struct lu_env *env,
949                                     struct mdd_device *mdd,
950                                     enum changelog_rec_type type,
951                                     int flags, struct mdd_object *mdd_obj,
952                                     struct thandle *handle)
953 {
954         const struct lu_fid             *tfid;
955         struct llog_changelog_rec       *rec;
956         struct lu_buf                   *buf;
957         int                              reclen;
958         int                              rc;
959
960         /* Not recording */
961         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
962                 RETURN(0);
963         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
964                 RETURN(0);
965
966         LASSERT(mdd_obj != NULL);
967         LASSERT(handle != NULL);
968
969         tfid = mdo2fid(mdd_obj);
970
971         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
972             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
973                 /* Don't need multiple updates in this log */
974                 /* Don't check under lock - no big deal if we get an extra
975                    entry */
976                 RETURN(0);
977         }
978
979         reclen = llog_data_len(sizeof(*rec));
980         buf = mdd_buf_alloc(env, reclen);
981         if (buf->lb_buf == NULL)
982                 RETURN(-ENOMEM);
983         rec = buf->lb_buf;
984
985         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
986         rec->cr.cr_type = (__u32)type;
987         rec->cr.cr_tfid = *tfid;
988         rec->cr.cr_namelen = 0;
989         mdd_obj->mod_cltime = cfs_time_current_64();
990
991         rc = mdd_changelog_store(env, mdd, rec, handle);
992
993         RETURN(rc);
994 }
995
996 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
997                   int flags, struct md_object *obj)
998 {
999         struct thandle *handle;
1000         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1001         struct mdd_device *mdd = mdo2mdd(obj);
1002         int rc;
1003         ENTRY;
1004
1005         handle = mdd_trans_create(env, mdd);
1006         if (IS_ERR(handle))
1007                 RETURN(PTR_ERR(handle));
1008
1009         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1010         if (rc)
1011                 GOTO(stop, rc);
1012
1013         rc = mdd_trans_start(env, mdd, handle);
1014         if (rc)
1015                 GOTO(stop, rc);
1016
1017         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1018                                       handle);
1019
1020 stop:
1021         mdd_trans_stop(env, mdd, rc, handle);
1022
1023         RETURN(rc);
1024 }
1025
1026 /**
1027  * Save LMA extended attributes with data from \a ma.
1028  *
1029  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1030  * not, LMA EA will be first read from disk, modified and write back.
1031  *
1032  */
1033 /* Precedence for choosing record type when multiple
1034  * attributes change: setattr > mtime > ctime > atime
1035  * (ctime changes when mtime does, plus chmod/chown.
1036  * atime and ctime are independent.) */
1037 static int mdd_attr_set_changelog(const struct lu_env *env,
1038                                   struct md_object *obj, struct thandle *handle,
1039                                   __u64 valid)
1040 {
1041         struct mdd_device *mdd = mdo2mdd(obj);
1042         int bits, type = 0;
1043
1044         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1045         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1046         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1047         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1048         bits = bits & mdd->mdd_cl.mc_mask;
1049         if (bits == 0)
1050                 return 0;
1051
1052         /* The record type is the lowest non-masked set bit */
1053         while (bits && ((bits & 1) == 0)) {
1054                 bits = bits >> 1;
1055                 type++;
1056         }
1057
1058         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1059         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1060                                         md2mdd_obj(obj), handle);
1061 }
1062
1063 static int mdd_declare_attr_set(const struct lu_env *env,
1064                                 struct mdd_device *mdd,
1065                                 struct mdd_object *obj,
1066                                 const struct lu_attr *attr,
1067                                 struct thandle *handle)
1068 {
1069         int rc;
1070
1071         rc = mdo_declare_attr_set(env, obj, attr, handle);
1072         if (rc)
1073                 return rc;
1074
1075 #ifdef CONFIG_FS_POSIX_ACL
1076         if (attr->la_valid & LA_MODE) {
1077                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1078                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1079                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1080                 mdd_read_unlock(env, obj);
1081                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1082                         rc = 0;
1083                 else if (rc < 0)
1084                         return rc;
1085
1086                 if (rc != 0) {
1087                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1088                         rc = mdo_declare_xattr_set(env, obj, buf,
1089                                                    XATTR_NAME_ACL_ACCESS, 0,
1090                                                    handle);
1091                         if (rc)
1092                                 return rc;
1093                 }
1094         }
1095 #endif
1096
1097         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1098         return rc;
1099 }
1100
1101 /* set attr and LOV EA at once, return updated attr */
1102 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1103                  const struct md_attr *ma)
1104 {
1105         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1106         struct mdd_device *mdd = mdo2mdd(obj);
1107         struct thandle *handle;
1108         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1109         const struct lu_attr *la = &ma->ma_attr;
1110         int rc;
1111         ENTRY;
1112
1113         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1114         LASSERT((ma->ma_valid & MA_LOV) == 0);
1115         LASSERT((ma->ma_valid & MA_HSM) == 0);
1116         LASSERT((ma->ma_valid & MA_SOM) == 0);
1117
1118         *la_copy = ma->ma_attr;
1119         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1120         if (rc)
1121                 RETURN(rc);
1122
1123         /* setattr on "close" only change atime, or do nothing */
1124         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1125                 RETURN(0);
1126
1127         handle = mdd_trans_create(env, mdd);
1128         if (IS_ERR(handle))
1129                 RETURN(PTR_ERR(handle));
1130
1131         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1132         if (rc)
1133                 GOTO(stop, rc);
1134
1135         rc = mdd_trans_start(env, mdd, handle);
1136         if (rc)
1137                 GOTO(stop, rc);
1138
1139         /* permission changes may require sync operation */
1140         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1141                 handle->th_sync |= !!mdd->mdd_sync_permission;
1142
1143         if (la->la_valid & (LA_MTIME | LA_CTIME))
1144                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1145                        la->la_mtime, la->la_ctime);
1146
1147         if (la_copy->la_valid & LA_FLAGS) {
1148                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1149                 if (rc == 0)
1150                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1151         } else if (la_copy->la_valid) {            /* setattr */
1152                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1153         }
1154
1155         if (rc == 0)
1156                 rc = mdd_attr_set_changelog(env, obj, handle,
1157                                             la->la_valid);
1158 stop:
1159         mdd_trans_stop(env, mdd, rc, handle);
1160         RETURN(rc);
1161 }
1162
1163 static int mdd_xattr_sanity_check(const struct lu_env *env,
1164                                   struct mdd_object *obj)
1165 {
1166         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1167         struct lu_ucred *uc     = lu_ucred_assert(env);
1168         int rc;
1169         ENTRY;
1170
1171         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1172                 RETURN(-EPERM);
1173
1174         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1175         if (rc)
1176                 RETURN(rc);
1177
1178         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1179             !mdd_capable(uc, CFS_CAP_FOWNER))
1180                 RETURN(-EPERM);
1181
1182         RETURN(rc);
1183 }
1184
1185 static int mdd_declare_xattr_set(const struct lu_env *env,
1186                                  struct mdd_device *mdd,
1187                                  struct mdd_object *obj,
1188                                  const struct lu_buf *buf,
1189                                  const char *name,
1190                                  int fl, struct thandle *handle)
1191 {
1192         int     rc;
1193
1194         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1195         if (rc)
1196                 return rc;
1197
1198         /* Only record user xattr changes */
1199         if ((strncmp("user.", name, 5) == 0)) {
1200                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1201                 if (rc)
1202                         return rc;
1203         }
1204
1205         /* If HSM data is modified, this could add a changelog */
1206         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
1207                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1208
1209         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1210         return rc;
1211 }
1212
1213 /*
1214  * Compare current and future data of HSM EA and add a changelog if needed.
1215  *
1216  * Caller should have write-locked \param obj.
1217  *
1218  * \param buf - Future HSM EA content.
1219  * \retval 0 if no changelog is needed or changelog was added properly.
1220  * \retval -ve errno if there was a problem
1221  */
1222 static int mdd_hsm_update_locked(const struct lu_env *env,
1223                                  struct md_object *obj,
1224                                  const struct lu_buf *buf,
1225                                  struct thandle *handle)
1226 {
1227         struct mdd_thread_info *info = mdd_env_info(env);
1228         struct mdd_device      *mdd = mdo2mdd(obj);
1229         struct mdd_object      *mdd_obj = md2mdd_obj(obj);
1230         struct lu_buf          *current_buf = &info->mti_buf;
1231         struct md_hsm          *current_mh;
1232         struct md_hsm          *new_mh;
1233         int                     rc;
1234         ENTRY;
1235
1236         OBD_ALLOC_PTR(current_mh);
1237         if (current_mh == NULL)
1238                 RETURN(-ENOMEM);
1239
1240         /* Read HSM attrs from disk */
1241         current_buf->lb_buf = info->mti_xattr_buf;
1242         current_buf->lb_len = sizeof(info->mti_xattr_buf);
1243         CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
1244         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
1245                            mdd_object_capa(env, mdd_obj));
1246         rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
1247         if (rc < 0 && rc != -ENODATA)
1248                 GOTO(free, rc);
1249         else if (rc == -ENODATA)
1250                 current_mh->mh_flags = 0;
1251
1252         /* Map future HSM xattr */
1253         OBD_ALLOC_PTR(new_mh);
1254         if (new_mh == NULL)
1255                 GOTO(free, rc = -ENOMEM);
1256         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1257
1258         /* If HSM flags are different, add a changelog */
1259         rc = 0;
1260         if (current_mh->mh_flags != new_mh->mh_flags) {
1261                 int flags = 0;
1262                 hsm_set_cl_event(&flags, HE_STATE);
1263                 if (new_mh->mh_flags & HS_DIRTY)
1264                         hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
1265
1266                 rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
1267                                               handle);
1268         }
1269
1270         OBD_FREE_PTR(new_mh);
1271 free:
1272         OBD_FREE_PTR(current_mh);
1273         return(rc);
1274 }
1275
1276
1277 /**
1278  * The caller should guarantee to update the object ctime
1279  * after xattr_set if needed.
1280  */
1281 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1282                          const struct lu_buf *buf, const char *name,
1283                          int fl)
1284 {
1285         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1286         struct mdd_device       *mdd = mdo2mdd(obj);
1287         struct thandle          *handle;
1288         int                      rc;
1289         ENTRY;
1290
1291         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1292                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1293                 RETURN(rc);
1294         }
1295
1296         rc = mdd_xattr_sanity_check(env, mdd_obj);
1297         if (rc)
1298                 RETURN(rc);
1299
1300         handle = mdd_trans_create(env, mdd);
1301         if (IS_ERR(handle))
1302                 RETURN(PTR_ERR(handle));
1303
1304         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
1305         if (rc)
1306                 GOTO(stop, rc);
1307
1308         rc = mdd_trans_start(env, mdd, handle);
1309         if (rc)
1310                 GOTO(stop, rc);
1311
1312         /* security-replated changes may require sync */
1313         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1314                 handle->th_sync |= !!mdd->mdd_sync_permission;
1315
1316         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1317
1318         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
1319                 rc = mdd_hsm_update_locked(env, obj, buf, handle);
1320                 if (rc) {
1321                         mdd_write_unlock(env, mdd_obj);
1322                         GOTO(stop, rc);
1323                 }
1324         }
1325
1326         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1327                            mdd_object_capa(env, mdd_obj));
1328         mdd_write_unlock(env, mdd_obj);
1329         if (rc)
1330                 GOTO(stop, rc);
1331
1332         /* Only record system & user xattr changes */
1333         if (strncmp(XATTR_USER_PREFIX, name,
1334                         sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1335             strncmp(POSIX_ACL_XATTR_ACCESS, name,
1336                         sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1337             strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1338                         sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1339                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1340                                               handle);
1341
1342 stop:
1343         mdd_trans_stop(env, mdd, rc, handle);
1344
1345         RETURN(rc);
1346 }
1347
1348 static int mdd_declare_xattr_del(const struct lu_env *env,
1349                                  struct mdd_device *mdd,
1350                                  struct mdd_object *obj,
1351                                  const char *name,
1352                                  struct thandle *handle)
1353 {
1354         int rc;
1355
1356         rc = mdo_declare_xattr_del(env, obj, name, handle);
1357         if (rc)
1358                 return rc;
1359
1360         /* Only record user xattr changes */
1361         if ((strncmp("user.", name, 5) == 0))
1362                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1363
1364         return rc;
1365 }
1366
1367 /**
1368  * The caller should guarantee to update the object ctime
1369  * after xattr_set if needed.
1370  */
1371 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1372                   const char *name)
1373 {
1374         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1375         struct mdd_device *mdd = mdo2mdd(obj);
1376         struct thandle *handle;
1377         int  rc;
1378         ENTRY;
1379
1380         rc = mdd_xattr_sanity_check(env, mdd_obj);
1381         if (rc)
1382                 RETURN(rc);
1383
1384         handle = mdd_trans_create(env, mdd);
1385         if (IS_ERR(handle))
1386                 RETURN(PTR_ERR(handle));
1387
1388         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1389         if (rc)
1390                 GOTO(stop, rc);
1391
1392         rc = mdd_trans_start(env, mdd, handle);
1393         if (rc)
1394                 GOTO(stop, rc);
1395
1396         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1397         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1398                            mdd_object_capa(env, mdd_obj));
1399         mdd_write_unlock(env, mdd_obj);
1400         if (rc)
1401                 GOTO(stop, rc);
1402
1403         /* Only record system & user xattr changes */
1404         if (strncmp(XATTR_USER_PREFIX, name,
1405                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1406                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1407                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1408                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1409                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1410                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1411                                               handle);
1412
1413 stop:
1414         mdd_trans_stop(env, mdd, rc, handle);
1415
1416         RETURN(rc);
1417 }
1418
1419 /*
1420  * read lov EA of an object
1421  * return the lov EA in an allocated lu_buf
1422  */
1423 static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
1424                                      struct mdd_object *obj)
1425 {
1426         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1427         struct lu_buf   *lmm_buf = NULL;
1428         int              rc, sz;
1429         ENTRY;
1430
1431 repeat:
1432         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
1433                            mdd_object_capa(env, obj));
1434         if (rc < 0)
1435                 GOTO(out, rc);
1436
1437         if (rc == 0)
1438                 GOTO(out, rc = -ENODATA);
1439
1440         sz = rc;
1441         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1442                 /* mti_big_buf was not allocated, so we have to
1443                  * allocate it based on the ea size */
1444                 buf = mdd_buf_alloc(env, sz);
1445                 if (buf->lb_buf == NULL)
1446                         GOTO(out, rc = -ENOMEM);
1447                 goto repeat;
1448         }
1449
1450         OBD_ALLOC_PTR(lmm_buf);
1451         if (!lmm_buf)
1452                 GOTO(out, rc = -ENOMEM);
1453
1454         OBD_ALLOC(lmm_buf->lb_buf, sz);
1455         if (!lmm_buf->lb_buf)
1456                 GOTO(free, rc = -ENOMEM);
1457
1458         memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
1459         lmm_buf->lb_len = sz;
1460
1461         GOTO(out, rc = 0);
1462
1463 free:
1464         if (lmm_buf)
1465                 OBD_FREE_PTR(lmm_buf);
1466 out:
1467         if (rc)
1468                 return ERR_PTR(rc);
1469         return lmm_buf;
1470 }
1471
1472
1473 /*
1474  *  check if layout swapping between 2 objects is allowed
1475  *  the rules are:
1476  *  - same type of objects
1477  *  - same owner/group (so quotas are still valid)
1478  */
1479 static int mdd_layout_swap_allowed(const struct lu_env *env,
1480                                    struct mdd_object *o1,
1481                                    struct mdd_object *o2)
1482 {
1483         const struct lu_fid     *fid1, *fid2;
1484         __u32                    uid, gid;
1485         struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
1486         int                      rc;
1487         ENTRY;
1488
1489         fid1 = mdo2fid(o1);
1490         fid2 = mdo2fid(o2);
1491
1492         if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
1493             (mdd_object_type(o1) != mdd_object_type(o2)))
1494                 RETURN(-EPERM);
1495
1496         tmp_la->la_valid = 0;
1497         rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
1498         if (rc)
1499                 RETURN(rc);
1500         uid = tmp_la->la_uid;
1501         gid = tmp_la->la_gid;
1502
1503         tmp_la->la_valid = 0;
1504         rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
1505         if (rc)
1506                 RETURN(rc);
1507
1508         if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
1509                 RETURN(-EPERM);
1510
1511         RETURN(0);
1512 }
1513
1514 /**
1515  * swap layouts between 2 lustre objects
1516  */
1517 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1518                             struct md_object *obj2, __u64 flags)
1519 {
1520         struct mdd_object       *o1, *o2, *fst_o, *snd_o;
1521         struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
1522         struct lu_buf           *fst_buf, *snd_buf;
1523         struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
1524         struct thandle          *handle;
1525         struct mdd_device       *mdd = mdo2mdd(obj1);
1526         int                      rc;
1527         __u16                    fst_gen, snd_gen;
1528         ENTRY;
1529
1530         /* we have to sort the 2 obj, so locking will always
1531          * be in the same order, even in case of 2 concurrent swaps */
1532         rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
1533                        mdo2fid(md2mdd_obj(obj2)));
1534         /* same fid ? */
1535         if (rc == 0)
1536                 RETURN(-EPERM);
1537
1538         if (rc > 0) {
1539                 o1 = md2mdd_obj(obj1);
1540                 o2 = md2mdd_obj(obj2);
1541         } else {
1542                 o1 = md2mdd_obj(obj2);
1543                 o2 = md2mdd_obj(obj1);
1544         }
1545
1546         /* check if layout swapping is allowed */
1547         rc = mdd_layout_swap_allowed(env, o1, o2);
1548         if (rc)
1549                 RETURN(rc);
1550
1551         handle = mdd_trans_create(env, mdd);
1552         if (IS_ERR(handle))
1553                 RETURN(PTR_ERR(handle));
1554
1555         /* objects are already sorted */
1556         mdd_write_lock(env, o1, MOR_TGT_CHILD);
1557         mdd_write_lock(env, o2, MOR_TGT_CHILD);
1558
1559         lmm1_buf = mdd_get_lov_ea(env, o1);
1560         if (IS_ERR(lmm1_buf)) {
1561                 rc = PTR_ERR(lmm1_buf);
1562                 lmm1_buf = NULL;
1563                 if (rc != -ENODATA)
1564                         GOTO(unlock, rc);
1565         }
1566
1567         lmm2_buf = mdd_get_lov_ea(env, o2);
1568         if (IS_ERR(lmm2_buf)) {
1569                 rc = PTR_ERR(lmm2_buf);
1570                 lmm2_buf = NULL;
1571                 if (rc != -ENODATA)
1572                         GOTO(unlock, rc);
1573         }
1574
1575         /* swapping 2 non existant layouts is a success */
1576         if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
1577                 GOTO(unlock, rc = 0);
1578
1579         /* to help inode migration between MDT, it is better to
1580          * start by the no layout file (if one), so we order the swap */
1581         if (lmm1_buf == NULL) {
1582                 fst_o = o1;
1583                 fst_buf = lmm1_buf;
1584                 snd_o = o2;
1585                 snd_buf = lmm2_buf;
1586         } else {
1587                 fst_o = o2;
1588                 fst_buf = lmm2_buf;
1589                 snd_o = o1;
1590                 snd_buf = lmm1_buf;
1591         }
1592
1593         /* lmm and generation layout initialization */
1594         if (fst_buf) {
1595                 fst_lmm = fst_buf->lb_buf;
1596                 fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
1597         } else {
1598                 fst_lmm = NULL;
1599                 fst_gen = 0;
1600         }
1601
1602         if (snd_buf) {
1603                 snd_lmm = snd_buf->lb_buf;
1604                 snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
1605         } else {
1606                 snd_lmm = NULL;
1607                 snd_gen = 0;
1608         }
1609
1610         /* save the orignal lmm common header of first file
1611          * to be able to roll back */
1612         OBD_ALLOC_PTR(old_fst_lmm);
1613         if (old_fst_lmm == NULL)
1614                 GOTO(unlock, rc = -ENOMEM);
1615
1616         memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
1617
1618         /* increase the generation layout numbers */
1619         snd_gen++;
1620         fst_gen++;
1621
1622         /* set the file specific informations in lmm */
1623         if (fst_lmm) {
1624                 fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
1625                 fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
1626                 fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
1627         }
1628
1629         if (snd_lmm) {
1630                 snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
1631                 snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1632                 snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1633         }
1634
1635         /* prepare transaction */
1636         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
1637                                    LU_XATTR_REPLACE, handle);
1638         if (rc)
1639                 GOTO(stop, rc);
1640
1641         rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
1642                                    LU_XATTR_REPLACE, handle);
1643         if (rc)
1644                 GOTO(stop, rc);
1645
1646         rc = mdd_trans_start(env, mdd, handle);
1647         if (rc)
1648                 GOTO(stop, rc);
1649
1650         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
1651                            LU_XATTR_REPLACE, handle,
1652                            mdd_object_capa(env, fst_o));
1653         if (rc)
1654                 GOTO(stop, rc);
1655
1656         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
1657                            LU_XATTR_REPLACE, handle,
1658                            mdd_object_capa(env, snd_o));
1659         if (rc) {
1660                 int     rc2;
1661
1662                 /* failure on second file, but first was done, so we have
1663                  * to roll back first */
1664                 /* restore object_id, object_seq and generation number
1665                  * on first file */
1666                 if (fst_lmm) {
1667                         fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1668                         fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1669                         fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
1670                 }
1671
1672                 rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
1673                                     LU_XATTR_REPLACE, handle,
1674                                     mdd_object_capa(env, fst_o));
1675                 if (rc2) {
1676                         /* very bad day */
1677                         CERROR("%s: unable to roll back after swap layouts"
1678                                " failure between "DFID" and "DFID
1679                                " rc2 = %d rc = %d)\n",
1680                                mdd2obd_dev(mdd)->obd_name,
1681                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
1682                                rc2, rc);
1683                         /* a solution to avoid journal commit is to panic,
1684                          * but it has strong consequences so we use LBUG to
1685                          * allow sysdamin to choose to panic or not
1686                          */
1687                         LBUG();
1688                 }
1689                 GOTO(stop, rc);
1690         }
1691         EXIT;
1692
1693 stop:
1694         mdd_trans_stop(env, mdd, rc, handle);
1695 unlock:
1696         mdd_write_unlock(env, o2);
1697         mdd_write_unlock(env, o1);
1698
1699         if (lmm1_buf && lmm1_buf->lb_buf)
1700                 OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
1701         if (lmm1_buf)
1702                 OBD_FREE_PTR(lmm1_buf);
1703
1704         if (lmm2_buf && lmm2_buf->lb_buf)
1705                 OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
1706         if (lmm2_buf)
1707                 OBD_FREE_PTR(lmm2_buf);
1708
1709         if (old_fst_lmm)
1710                 OBD_FREE_PTR(old_fst_lmm);
1711
1712         return rc;
1713 }
1714
1715 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1716                 struct mdd_object *child, struct lu_attr *attr)
1717 {
1718         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1719         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1720         struct dt_object *nc = mdd_object_child(child);
1721
1722         /* @hint will be initialized by underlying device. */
1723         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1724 }
1725
1726 /*
1727  * do NOT or the MAY_*'s, you'll get the weakest
1728  */
1729 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1730 {
1731         int res = 0;
1732
1733         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1734          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1735          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1736          * owner can write to a file even if it is marked readonly to hide
1737          * its brokenness. (bug 5781) */
1738         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1739                 struct lu_ucred *uc = lu_ucred_check(env);
1740
1741                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1742                         return 0;
1743         }
1744
1745         if (flags & FMODE_READ)
1746                 res |= MAY_READ;
1747         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1748                 res |= MAY_WRITE;
1749         if (flags & MDS_FMODE_EXEC)
1750                 res = MAY_EXEC;
1751         return res;
1752 }
1753
1754 static int mdd_open_sanity_check(const struct lu_env *env,
1755                                  struct mdd_object *obj, int flag)
1756 {
1757         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1758         int mode, rc;
1759         ENTRY;
1760
1761         /* EEXIST check */
1762         if (mdd_is_dead_obj(obj))
1763                 RETURN(-ENOENT);
1764
1765         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1766         if (rc)
1767                RETURN(rc);
1768
1769         if (S_ISLNK(tmp_la->la_mode))
1770                 RETURN(-ELOOP);
1771
1772         mode = accmode(env, tmp_la, flag);
1773
1774         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1775                 RETURN(-EISDIR);
1776
1777         if (!(flag & MDS_OPEN_CREATED)) {
1778                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1779                 if (rc)
1780                         RETURN(rc);
1781         }
1782
1783         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1784             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1785                 flag &= ~MDS_OPEN_TRUNC;
1786
1787         /* For writing append-only file must open it with append mode. */
1788         if (mdd_is_append(obj)) {
1789                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1790                         RETURN(-EPERM);
1791                 if (flag & MDS_OPEN_TRUNC)
1792                         RETURN(-EPERM);
1793         }
1794
1795 #if 0
1796         /*
1797          * Now, flag -- O_NOATIME does not be packed by client.
1798          */
1799         if (flag & O_NOATIME) {
1800                 struct lu_ucred *uc = lu_ucred(env);
1801
1802                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1803                            (uc->uc_valid == UCRED_NEW)) &&
1804                     (uc->uc_fsuid != tmp_la->la_uid) &&
1805                     !mdd_capable(uc, CFS_CAP_FOWNER))
1806                         RETURN(-EPERM);
1807         }
1808 #endif
1809
1810         RETURN(0);
1811 }
1812
1813 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1814                     int flags)
1815 {
1816         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1817         int rc = 0;
1818
1819         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1820
1821         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1822         if (rc == 0)
1823                 mdd_obj->mod_count++;
1824
1825         mdd_write_unlock(env, mdd_obj);
1826         return rc;
1827 }
1828
1829 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1830                             struct md_attr *ma, struct thandle *handle)
1831 {
1832         return mdo_declare_destroy(env, obj, handle);
1833 }
1834
1835 /* return md_attr back,
1836  * if it is last unlink then return lov ea + llog cookie*/
1837 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1838                     struct md_attr *ma, struct thandle *handle)
1839 {
1840         int rc;
1841         ENTRY;
1842
1843         rc = mdo_destroy(env, obj, handle);
1844
1845         RETURN(rc);
1846 }
1847
1848 static int mdd_declare_close(const struct lu_env *env,
1849                              struct mdd_object *obj,
1850                              struct md_attr *ma,
1851                              struct thandle *handle)
1852 {
1853         int rc;
1854
1855         rc = orph_declare_index_delete(env, obj, handle);
1856         if (rc)
1857                 return rc;
1858
1859         return mdo_declare_destroy(env, obj, handle);
1860 }
1861
1862 /*
1863  * No permission check is needed.
1864  */
1865 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1866                      struct md_attr *ma, int mode)
1867 {
1868         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1869         struct mdd_device *mdd = mdo2mdd(obj);
1870         struct thandle    *handle = NULL;
1871         int rc, is_orphan = 0;
1872         ENTRY;
1873
1874         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1875                 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1876                 mdd_obj->mod_count--;
1877                 mdd_write_unlock(env, mdd_obj);
1878
1879                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1880                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1881                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1882                 RETURN(0);
1883         }
1884
1885         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
1886          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
1887         /* check without any lock */
1888         is_orphan = mdd_obj->mod_count == 1 &&
1889                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
1890
1891 again:
1892         if (is_orphan) {
1893                 handle = mdd_trans_create(env, mdo2mdd(obj));
1894                 if (IS_ERR(handle))
1895                         RETURN(PTR_ERR(handle));
1896
1897                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1898                 if (rc)
1899                         GOTO(stop, rc);
1900
1901                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1902                 if (rc)
1903                         GOTO(stop, rc);
1904
1905                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1906                 if (rc)
1907                         GOTO(stop, rc);
1908         }
1909
1910         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1911         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1912                         mdd_object_capa(env, mdd_obj));
1913         if (rc != 0) {
1914                 CERROR("Failed to get lu_attr of "DFID": %d\n",
1915                        PFID(mdd_object_fid(mdd_obj)), rc);
1916                 GOTO(out, rc);
1917         }
1918
1919         /* check again with lock */
1920         is_orphan = (mdd_obj->mod_count == 1) &&
1921                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
1922                      ma->ma_attr.la_nlink == 0);
1923
1924         if (is_orphan && handle == NULL) {
1925                 mdd_write_unlock(env, mdd_obj);
1926                 goto again;
1927         }
1928
1929         mdd_obj->mod_count--; /*release open count */
1930
1931         if (!is_orphan)
1932                 GOTO(out, rc = 0);
1933
1934         /* Orphan object */
1935         /* NB: Object maybe not in orphan list originally, it is rare case for
1936          * mdd_finish_unlink() failure, in that case, the object doesn't have
1937          * ORPHAN_OBJ flag */
1938         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1939                 /* remove link to object from orphan index */
1940                 LASSERT(handle != NULL);
1941                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1942                 if (rc != 0) {
1943                         CERROR("%s: unable to delete "DFID" from orphan list: "
1944                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1945                                PFID(mdd_object_fid(mdd_obj)), rc);
1946                         /* If object was not deleted from orphan list, do not
1947                          * destroy OSS objects, which will be done when next
1948                          * recovery. */
1949                         GOTO(out, rc);
1950                 }
1951
1952                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1953                        "list, OSS objects to be destroyed.\n",
1954                        PFID(mdd_object_fid(mdd_obj)));
1955         }
1956
1957         rc = mdo_destroy(env, mdd_obj, handle);
1958
1959         if (rc != 0) {
1960                 CERROR("%s: unable to delete "DFID" from orphan list: "
1961                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1962                        PFID(mdd_object_fid(mdd_obj)), rc);
1963         }
1964         EXIT;
1965
1966 out:
1967         mdd_write_unlock(env, mdd_obj);
1968
1969         if (rc == 0 &&
1970             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1971             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1972                 if (handle == NULL) {
1973                         handle = mdd_trans_create(env, mdo2mdd(obj));
1974                         if (IS_ERR(handle))
1975                                 GOTO(stop, rc = IS_ERR(handle));
1976
1977                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1978                                                          handle);
1979                         if (rc)
1980                                 GOTO(stop, rc);
1981
1982                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1983                         if (rc)
1984                                 GOTO(stop, rc);
1985                 }
1986
1987                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1988                                          mdd_obj, handle);
1989         }
1990
1991 stop:
1992         if (handle != NULL)
1993                 mdd_trans_stop(env, mdd, rc, handle);
1994         return rc;
1995 }
1996
1997 /*
1998  * Permission check is done when open,
1999  * no need check again.
2000  */
2001 static int mdd_readpage_sanity_check(const struct lu_env *env,
2002                                      struct mdd_object *obj)
2003 {
2004         struct dt_object *next = mdd_object_child(obj);
2005         int rc;
2006         ENTRY;
2007
2008         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2009                 rc = 0;
2010         else
2011                 rc = -ENOTDIR;
2012
2013         RETURN(rc);
2014 }
2015
2016 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2017                               int nob, const struct dt_it_ops *iops,
2018                               struct dt_it *it, __u32 attr, void *arg)
2019 {
2020         struct lu_dirpage       *dp = &lp->lp_dir;
2021         void                    *area = dp;
2022         int                      result;
2023         __u64                    hash = 0;
2024         struct lu_dirent        *ent;
2025         struct lu_dirent        *last = NULL;
2026         int                      first = 1;
2027
2028         memset(area, 0, sizeof (*dp));
2029         area += sizeof (*dp);
2030         nob  -= sizeof (*dp);
2031
2032         ent  = area;
2033         do {
2034                 int    len;
2035                 int    recsize;
2036
2037                 len = iops->key_size(env, it);
2038
2039                 /* IAM iterator can return record with zero len. */
2040                 if (len == 0)
2041                         goto next;
2042
2043                 hash = iops->store(env, it);
2044                 if (unlikely(first)) {
2045                         first = 0;
2046                         dp->ldp_hash_start = cpu_to_le64(hash);
2047                 }
2048
2049                 /* calculate max space required for lu_dirent */
2050                 recsize = lu_dirent_calc_size(len, attr);
2051
2052                 if (nob >= recsize) {
2053                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2054                         if (result == -ESTALE)
2055                                 goto next;
2056                         if (result != 0)
2057                                 goto out;
2058
2059                         /* osd might not able to pack all attributes,
2060                          * so recheck rec length */
2061                         recsize = le16_to_cpu(ent->lde_reclen);
2062                 } else {
2063                         result = (last != NULL) ? 0 :-EINVAL;
2064                         goto out;
2065                 }
2066                 last = ent;
2067                 ent = (void *)ent + recsize;
2068                 nob -= recsize;
2069
2070 next:
2071                 result = iops->next(env, it);
2072                 if (result == -ESTALE)
2073                         goto next;
2074         } while (result == 0);
2075
2076 out:
2077         dp->ldp_hash_end = cpu_to_le64(hash);
2078         if (last != NULL) {
2079                 if (last->lde_hash == dp->ldp_hash_end)
2080                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2081                 last->lde_reclen = 0; /* end mark */
2082         }
2083         if (result > 0)
2084                 /* end of directory */
2085                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2086         else if (result < 0)
2087                 CWARN("build page failed: %d!\n", result);
2088         return result;
2089 }
2090
2091 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2092                  const struct lu_rdpg *rdpg)
2093 {
2094         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2095         int rc;
2096         ENTRY;
2097
2098         if (mdd_object_exists(mdd_obj) == 0) {
2099                 CERROR("%s: object "DFID" not found: rc = -2\n",
2100                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2101                 return -ENOENT;
2102         }
2103
2104         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2105         rc = mdd_readpage_sanity_check(env, mdd_obj);
2106         if (rc)
2107                 GOTO(out_unlock, rc);
2108
2109         if (mdd_is_dead_obj(mdd_obj)) {
2110                 struct page *pg;
2111                 struct lu_dirpage *dp;
2112
2113                 /*
2114                  * According to POSIX, please do not return any entry to client:
2115                  * even dot and dotdot should not be returned.
2116                  */
2117                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2118                        PFID(mdd_object_fid(mdd_obj)));
2119
2120                 if (rdpg->rp_count <= 0)
2121                         GOTO(out_unlock, rc = -EFAULT);
2122                 LASSERT(rdpg->rp_pages != NULL);
2123
2124                 pg = rdpg->rp_pages[0];
2125                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2126                 memset(dp, 0 , sizeof(struct lu_dirpage));
2127                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2128                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2129                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2130                 cfs_kunmap(pg);
2131                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2132         }
2133
2134         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2135                            mdd_dir_page_build, NULL);
2136         if (rc >= 0) {
2137                 struct lu_dirpage       *dp;
2138
2139                 dp = cfs_kmap(rdpg->rp_pages[0]);
2140                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2141                 if (rc == 0) {
2142                         /*
2143                          * No pages were processed, mark this for first page
2144                          * and send back.
2145                          */
2146                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2147                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2148                 }
2149                 cfs_kunmap(rdpg->rp_pages[0]);
2150         }
2151
2152         GOTO(out_unlock, rc);
2153 out_unlock:
2154         mdd_read_unlock(env, mdd_obj);
2155         return rc;
2156 }
2157
2158 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2159 {
2160         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2161
2162         if (mdd_object_exists(mdd_obj) == 0) {
2163                 CERROR("%s: object "DFID" not found: rc = -2\n",
2164                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2165                 return -ENOENT;
2166         }
2167         return dt_object_sync(env, mdd_object_child(mdd_obj));
2168 }
2169
2170 const struct md_object_operations mdd_obj_ops = {
2171         .moo_permission         = mdd_permission,
2172         .moo_attr_get           = mdd_attr_get,
2173         .moo_attr_set           = mdd_attr_set,
2174         .moo_xattr_get          = mdd_xattr_get,
2175         .moo_xattr_set          = mdd_xattr_set,
2176         .moo_xattr_list         = mdd_xattr_list,
2177         .moo_xattr_del          = mdd_xattr_del,
2178         .moo_swap_layouts       = mdd_swap_layouts,
2179         .moo_open               = mdd_open,
2180         .moo_close              = mdd_close,
2181         .moo_readpage           = mdd_readpage,
2182         .moo_readlink           = mdd_readlink,
2183         .moo_changelog          = mdd_changelog,
2184         .moo_capa_get           = mdd_capa_get,
2185         .moo_object_sync        = mdd_object_sync,
2186         .moo_path               = mdd_path,
2187 };