Whamcloud - gitweb
1937db408115d5a199f09eb4e54d612198fce142
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         *buf = LU_BUF_NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 *buf = LU_BUF_NULL;
146         }
147         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         *buf = LU_BUF_NULL;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct lu_object *mdd_object_alloc(const struct lu_env *env,
183                                    const struct lu_object_header *hdr,
184                                    struct lu_device *d)
185 {
186         struct mdd_object *mdd_obj;
187
188         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
189         if (mdd_obj != NULL) {
190                 struct lu_object *o;
191
192                 o = mdd2lu_obj(mdd_obj);
193                 lu_object_init(o, NULL, d);
194                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
195                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
196                 mdd_obj->mod_count = 0;
197                 o->lo_ops = &mdd_lu_obj_ops;
198                 return o;
199         } else {
200                 return NULL;
201         }
202 }
203
204 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
205                            const struct lu_object_conf *unused)
206 {
207         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
208         struct mdd_object *mdd_obj = lu2mdd_obj(o);
209         struct lu_object  *below;
210         struct lu_device  *under;
211         ENTRY;
212
213         mdd_obj->mod_cltime = 0;
214         under = &d->mdd_child->dd_lu_dev;
215         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
216         mdd_pdlock_init(mdd_obj);
217         if (below == NULL)
218                 RETURN(-ENOMEM);
219
220         lu_object_add(o, below);
221
222         RETURN(0);
223 }
224
225 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
226 {
227         if (lu_object_exists(o))
228                 return mdd_get_flags(env, lu2mdd_obj(o));
229         else
230                 return 0;
231 }
232
233 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
234 {
235         struct mdd_object *mdd = lu2mdd_obj(o);
236
237         lu_object_fini(o);
238         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
239 }
240
241 static int mdd_object_print(const struct lu_env *env, void *cookie,
242                             lu_printer_t p, const struct lu_object *o)
243 {
244         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
245         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
246                     "valid=%x, cltime="LPU64", flags=%lx)",
247                     mdd, mdd->mod_count, mdd->mod_valid,
248                     mdd->mod_cltime, mdd->mod_flags);
249 }
250
251 static const struct lu_object_operations mdd_lu_obj_ops = {
252         .loo_object_init    = mdd_object_init,
253         .loo_object_start   = mdd_object_start,
254         .loo_object_free    = mdd_object_free,
255         .loo_object_print   = mdd_object_print,
256 };
257
258 struct mdd_object *mdd_object_find(const struct lu_env *env,
259                                    struct mdd_device *d,
260                                    const struct lu_fid *f)
261 {
262         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
263 }
264
265 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
266                         const char *path, struct lu_fid *fid)
267 {
268         struct lu_buf *buf;
269         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
270         struct mdd_object *obj;
271         struct lu_name *lname = &mdd_env_info(env)->mti_name;
272         char *name;
273         int rc = 0;
274         ENTRY;
275
276         /* temp buffer for path element */
277         buf = mdd_buf_alloc(env, PATH_MAX);
278         if (buf->lb_buf == NULL)
279                 RETURN(-ENOMEM);
280
281         lname->ln_name = name = buf->lb_buf;
282         lname->ln_namelen = 0;
283         *f = mdd->mdd_root_fid;
284
285         while(1) {
286                 while (*path == '/')
287                         path++;
288                 if (*path == '\0')
289                         break;
290                 while (*path != '/' && *path != '\0') {
291                         *name = *path;
292                         path++;
293                         name++;
294                         lname->ln_namelen++;
295                 }
296
297                 *name = '\0';
298                 /* find obj corresponding to fid */
299                 obj = mdd_object_find(env, mdd, f);
300                 if (obj == NULL)
301                         GOTO(out, rc = -EREMOTE);
302                 if (IS_ERR(obj))
303                         GOTO(out, rc = PTR_ERR(obj));
304                 /* get child fid from parent and name */
305                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
306                 mdd_object_put(env, obj);
307                 if (rc)
308                         break;
309
310                 name = buf->lb_buf;
311                 lname->ln_namelen = 0;
312         }
313
314         if (!rc)
315                 *fid = *f;
316 out:
317         RETURN(rc);
318 }
319
320 /** The maximum depth that fid2path() will search.
321  * This is limited only because we want to store the fids for
322  * historical path lookup purposes.
323  */
324 #define MAX_PATH_DEPTH 100
325
326 /** mdd_path() lookup structure. */
327 struct path_lookup_info {
328         __u64                pli_recno;        /**< history point */
329         __u64                pli_currec;       /**< current record */
330         struct lu_fid        pli_fid;
331         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
332         struct mdd_object   *pli_mdd_obj;
333         char                *pli_path;         /**< full path */
334         int                  pli_pathlen;
335         int                  pli_linkno;       /**< which hardlink to follow */
336         int                  pli_fidcount;     /**< number of \a pli_fids */
337 };
338
339 static int mdd_path_current(const struct lu_env *env,
340                             struct path_lookup_info *pli)
341 {
342         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
343         struct mdd_object *mdd_obj;
344         struct lu_buf     *buf = NULL;
345         struct link_ea_header *leh;
346         struct link_ea_entry  *lee;
347         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
348         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
349         char *ptr;
350         int reclen;
351         int rc;
352         ENTRY;
353
354         ptr = pli->pli_path + pli->pli_pathlen - 1;
355         *ptr = 0;
356         --ptr;
357         pli->pli_fidcount = 0;
358         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
359
360         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
361                 mdd_obj = mdd_object_find(env, mdd,
362                                           &pli->pli_fids[pli->pli_fidcount]);
363                 if (mdd_obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(mdd_obj))
366                         GOTO(out, rc = PTR_ERR(mdd_obj));
367                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
368                 if (rc <= 0) {
369                         mdd_object_put(env, mdd_obj);
370                         if (rc == -1)
371                                 rc = -EREMOTE;
372                         else if (rc == 0)
373                                 /* Do I need to error out here? */
374                                 rc = -ENOENT;
375                         GOTO(out, rc);
376                 }
377
378                 /* Get parent fid and object name */
379                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
380                 buf = mdd_links_get(env, mdd_obj);
381                 mdd_read_unlock(env, mdd_obj);
382                 mdd_object_put(env, mdd_obj);
383                 if (IS_ERR(buf))
384                         GOTO(out, rc = PTR_ERR(buf));
385
386                 leh = buf->lb_buf;
387                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
388                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
389
390                 /* If set, use link #linkno for path lookup, otherwise use
391                    link #0.  Only do this for the final path element. */
392                 if ((pli->pli_fidcount == 0) &&
393                     (pli->pli_linkno < leh->leh_reccount)) {
394                         int count;
395                         for (count = 0; count < pli->pli_linkno; count++) {
396                                 lee = (struct link_ea_entry *)
397                                      ((char *)lee + reclen);
398                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
399                         }
400                         if (pli->pli_linkno < leh->leh_reccount - 1)
401                                 /* indicate to user there are more links */
402                                 pli->pli_linkno++;
403                 }
404
405                 /* Pack the name in the end of the buffer */
406                 ptr -= tmpname->ln_namelen;
407                 if (ptr - 1 <= pli->pli_path)
408                         GOTO(out, rc = -EOVERFLOW);
409                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
410                 *(--ptr) = '/';
411
412                 /* Store the parent fid for historic lookup */
413                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
414                         GOTO(out, rc = -EOVERFLOW);
415                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
416         }
417
418         /* Verify that our path hasn't changed since we started the lookup.
419            Record the current index, and verify the path resolves to the
420            same fid. If it does, then the path is correct as of this index. */
421         spin_lock(&mdd->mdd_cl.mc_lock);
422         pli->pli_currec = mdd->mdd_cl.mc_index;
423         spin_unlock(&mdd->mdd_cl.mc_lock);
424         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
425         if (rc) {
426                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
427                 GOTO (out, rc = -EAGAIN);
428         }
429         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
430                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
431                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
432                        PFID(&pli->pli_fid));
433                 GOTO(out, rc = -EAGAIN);
434         }
435         ptr++; /* skip leading / */
436         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
437
438         EXIT;
439 out:
440         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
441                 /* if we vmalloced a large buffer drop it */
442                 mdd_buf_put(buf);
443
444         return rc;
445 }
446
447 static int mdd_path_historic(const struct lu_env *env,
448                              struct path_lookup_info *pli)
449 {
450         return 0;
451 }
452
453 /* Returns the full path to this fid, as of changelog record recno. */
454 static int mdd_path(const struct lu_env *env, struct md_object *obj,
455                     char *path, int pathlen, __u64 *recno, int *linkno)
456 {
457         struct path_lookup_info *pli;
458         int tries = 3;
459         int rc = -EAGAIN;
460         ENTRY;
461
462         if (pathlen < 3)
463                 RETURN(-EOVERFLOW);
464
465         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
466                 path[0] = '\0';
467                 RETURN(0);
468         }
469
470         OBD_ALLOC_PTR(pli);
471         if (pli == NULL)
472                 RETURN(-ENOMEM);
473
474         pli->pli_mdd_obj = md2mdd_obj(obj);
475         pli->pli_recno = *recno;
476         pli->pli_path = path;
477         pli->pli_pathlen = pathlen;
478         pli->pli_linkno = *linkno;
479
480         /* Retry multiple times in case file is being moved */
481         while (tries-- && rc == -EAGAIN)
482                 rc = mdd_path_current(env, pli);
483
484         /* For historical path lookup, the current links may not have existed
485          * at "recno" time.  We must switch over to earlier links/parents
486          * by using the changelog records.  If the earlier parent doesn't
487          * exist, we must search back through the changelog to reconstruct
488          * its parents, then check if it exists, etc.
489          * We may ignore this problem for the initial implementation and
490          * state that an "original" hardlink must still exist for us to find
491          * historic path name. */
492         if (pli->pli_recno != -1) {
493                 rc = mdd_path_historic(env, pli);
494         } else {
495                 *recno = pli->pli_currec;
496                 /* Return next link index to caller */
497                 *linkno = pli->pli_linkno;
498         }
499
500         OBD_FREE_PTR(pli);
501
502         RETURN (rc);
503 }
504
505 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
506 {
507         struct lu_attr *la = &mdd_env_info(env)->mti_la;
508         int rc;
509
510         ENTRY;
511         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
512         if (rc == 0) {
513                 mdd_flags_xlate(obj, la->la_flags);
514         }
515         RETURN(rc);
516 }
517
518 /*
519  * No permission check is needed.
520  */
521 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
522                  struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526
527         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
528                           mdd_object_capa(env, md2mdd_obj(obj)));
529         RETURN(rc);
530 }
531
532 /*
533  * No permission check is needed.
534  */
535 static int mdd_xattr_get(const struct lu_env *env,
536                          struct md_object *obj, struct lu_buf *buf,
537                          const char *name)
538 {
539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
540         int rc;
541
542         ENTRY;
543
544         if (mdd_object_exists(mdd_obj) == 0) {
545                 CERROR("%s: object "DFID" not found: rc = -2\n",
546                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
547                 return -ENOENT;
548         }
549
550         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
551         rc = mdo_xattr_get(env, mdd_obj, buf, name,
552                            mdd_object_capa(env, mdd_obj));
553         mdd_read_unlock(env, mdd_obj);
554
555         RETURN(rc);
556 }
557
558 /*
559  * Permission check is done when open,
560  * no need check again.
561  */
562 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
563                         struct lu_buf *buf)
564 {
565         struct mdd_object *mdd_obj = md2mdd_obj(obj);
566         struct dt_object  *next;
567         loff_t             pos = 0;
568         int                rc;
569         ENTRY;
570
571         if (mdd_object_exists(mdd_obj) == 0) {
572                 CERROR("%s: object "DFID" not found: rc = -2\n",
573                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
574                 return -ENOENT;
575         }
576
577         next = mdd_object_child(mdd_obj);
578         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
579         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
580                                          mdd_object_capa(env, mdd_obj));
581         mdd_read_unlock(env, mdd_obj);
582         RETURN(rc);
583 }
584
585 /*
586  * No permission check is needed.
587  */
588 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
589                           struct lu_buf *buf)
590 {
591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
592         int rc;
593
594         ENTRY;
595
596         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
597         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
598         mdd_read_unlock(env, mdd_obj);
599
600         RETURN(rc);
601 }
602
603 int mdd_declare_object_create_internal(const struct lu_env *env,
604                                        struct mdd_object *p,
605                                        struct mdd_object *c,
606                                        struct lu_attr *attr,
607                                        struct thandle *handle,
608                                        const struct md_op_spec *spec)
609 {
610         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
611         const struct dt_index_features *feat = spec->sp_feat;
612         int rc;
613         ENTRY;
614
615         if (feat != &dt_directory_features && feat != NULL) {
616                 dof->dof_type = DFT_INDEX;
617                 dof->u.dof_idx.di_feat = feat;
618
619         } else {
620                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
621                 if (dof->dof_type == DFT_REGULAR) {
622                         dof->u.dof_reg.striped =
623                                 md_should_create(spec->sp_cr_flags);
624                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
625                                 dof->u.dof_reg.striped = 0;
626                         /* is this replay? */
627                         if (spec->no_create)
628                                 dof->u.dof_reg.striped = 0;
629                 }
630         }
631
632         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
633
634         RETURN(rc);
635 }
636
637 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
638                                struct mdd_object *c, struct lu_attr *attr,
639                                struct thandle *handle,
640                                const struct md_op_spec *spec)
641 {
642         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
643         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
644         int rc;
645         ENTRY;
646
647         LASSERT(!mdd_object_exists(c));
648
649         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
650
651         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
652
653         RETURN(rc);
654 }
655
656 /**
657  * Make sure the ctime is increased only.
658  */
659 static inline int mdd_attr_check(const struct lu_env *env,
660                                  struct mdd_object *obj,
661                                  struct lu_attr *attr)
662 {
663         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
664         int rc;
665         ENTRY;
666
667         if (attr->la_valid & LA_CTIME) {
668                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
669                 if (rc)
670                         RETURN(rc);
671
672                 if (attr->la_ctime < tmp_la->la_ctime)
673                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
674                 else if (attr->la_valid == LA_CTIME &&
675                          attr->la_ctime == tmp_la->la_ctime)
676                         attr->la_valid &= ~LA_CTIME;
677         }
678         RETURN(0);
679 }
680
681 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
682                           struct lu_attr *attr, struct thandle *handle,
683                           int needacl)
684 {
685         int rc;
686         ENTRY;
687
688         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
689 #ifdef CONFIG_FS_POSIX_ACL
690         if (!rc && (attr->la_valid & LA_MODE) && needacl)
691                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
692 #endif
693         RETURN(rc);
694 }
695
696 int mdd_attr_check_set_internal(const struct lu_env *env,
697                                 struct mdd_object *obj, struct lu_attr *attr,
698                                 struct thandle *handle, int needacl)
699 {
700         int rc;
701         ENTRY;
702
703         rc = mdd_attr_check(env, obj, attr);
704         if (rc)
705                 RETURN(rc);
706
707         if (attr->la_valid)
708                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
709         RETURN(rc);
710 }
711
712 /*
713  * This gives the same functionality as the code between
714  * sys_chmod and inode_setattr
715  * chown_common and inode_setattr
716  * utimes and inode_setattr
717  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
718  */
719 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
720                         struct lu_attr *la, const unsigned long flags)
721 {
722         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
723         struct lu_ucred  *uc;
724         int               rc;
725         ENTRY;
726
727         if (!la->la_valid)
728                 RETURN(0);
729
730         /* Do not permit change file type */
731         if (la->la_valid & LA_TYPE)
732                 RETURN(-EPERM);
733
734         /* They should not be processed by setattr */
735         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
736                 RETURN(-EPERM);
737
738         /* export destroy does not have ->le_ses, but we may want
739          * to drop LUSTRE_SOM_FL. */
740         uc = lu_ucred_check(env);
741         if (uc == NULL)
742                 RETURN(0);
743
744         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
745         if (rc)
746                 RETURN(rc);
747
748         if (la->la_valid == LA_CTIME) {
749                 if (!(flags & MDS_PERM_BYPASS))
750                         /* This is only for set ctime when rename's source is
751                          * on remote MDS. */
752                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
753                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
754                         la->la_valid &= ~LA_CTIME;
755                 RETURN(rc);
756         }
757
758         if (la->la_valid == LA_ATIME) {
759                 /* This is atime only set for read atime update on close. */
760                 if (la->la_atime >= tmp_la->la_atime &&
761                     la->la_atime < (tmp_la->la_atime +
762                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
763                         la->la_valid &= ~LA_ATIME;
764                 RETURN(0);
765         }
766
767         /* Check if flags change. */
768         if (la->la_valid & LA_FLAGS) {
769                 unsigned int oldflags = 0;
770                 unsigned int newflags = la->la_flags &
771                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
772
773                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
774                     !mdd_capable(uc, CFS_CAP_FOWNER))
775                         RETURN(-EPERM);
776
777                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
778                  * only be changed by the relevant capability. */
779                 if (mdd_is_immutable(obj))
780                         oldflags |= LUSTRE_IMMUTABLE_FL;
781                 if (mdd_is_append(obj))
782                         oldflags |= LUSTRE_APPEND_FL;
783                 if ((oldflags ^ newflags) &&
784                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
785                         RETURN(-EPERM);
786
787                 if (!S_ISDIR(tmp_la->la_mode))
788                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
789         }
790
791         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
792             (la->la_valid & ~LA_FLAGS) &&
793             !(flags & MDS_PERM_BYPASS))
794                 RETURN(-EPERM);
795
796         /* Check for setting the obj time. */
797         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
798             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
799                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
800                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
801                         rc = mdd_permission_internal(env, obj, tmp_la,
802                                                      MAY_WRITE);
803                         if (rc)
804                                 RETURN(rc);
805                 }
806         }
807
808         if (la->la_valid & LA_KILL_SUID) {
809                 la->la_valid &= ~LA_KILL_SUID;
810                 if ((tmp_la->la_mode & S_ISUID) &&
811                     !(la->la_valid & LA_MODE)) {
812                         la->la_mode = tmp_la->la_mode;
813                         la->la_valid |= LA_MODE;
814                 }
815                 la->la_mode &= ~S_ISUID;
816         }
817
818         if (la->la_valid & LA_KILL_SGID) {
819                 la->la_valid &= ~LA_KILL_SGID;
820                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
821                                         (S_ISGID | S_IXGRP)) &&
822                     !(la->la_valid & LA_MODE)) {
823                         la->la_mode = tmp_la->la_mode;
824                         la->la_valid |= LA_MODE;
825                 }
826                 la->la_mode &= ~S_ISGID;
827         }
828
829         /* Make sure a caller can chmod. */
830         if (la->la_valid & LA_MODE) {
831                 if (!(flags & MDS_PERM_BYPASS) &&
832                     (uc->uc_fsuid != tmp_la->la_uid) &&
833                     !mdd_capable(uc, CFS_CAP_FOWNER))
834                         RETURN(-EPERM);
835
836                 if (la->la_mode == (cfs_umode_t) -1)
837                         la->la_mode = tmp_la->la_mode;
838                 else
839                         la->la_mode = (la->la_mode & S_IALLUGO) |
840                                       (tmp_la->la_mode & ~S_IALLUGO);
841
842                 /* Also check the setgid bit! */
843                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
844                                        la->la_gid : tmp_la->la_gid) &&
845                     !mdd_capable(uc, CFS_CAP_FSETID))
846                         la->la_mode &= ~S_ISGID;
847         } else {
848                la->la_mode = tmp_la->la_mode;
849         }
850
851         /* Make sure a caller can chown. */
852         if (la->la_valid & LA_UID) {
853                 if (la->la_uid == (uid_t) -1)
854                         la->la_uid = tmp_la->la_uid;
855                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
856                      (la->la_uid != tmp_la->la_uid)) &&
857                     !mdd_capable(uc, CFS_CAP_CHOWN))
858                         RETURN(-EPERM);
859
860                 /* If the user or group of a non-directory has been
861                  * changed by a non-root user, remove the setuid bit.
862                  * 19981026 David C Niemi <niemi@tux.org>
863                  *
864                  * Changed this to apply to all users, including root,
865                  * to avoid some races. This is the behavior we had in
866                  * 2.0. The check for non-root was definitely wrong
867                  * for 2.2 anyway, as it should have been using
868                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
869                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
870                     !S_ISDIR(tmp_la->la_mode)) {
871                         la->la_mode &= ~S_ISUID;
872                         la->la_valid |= LA_MODE;
873                 }
874         }
875
876         /* Make sure caller can chgrp. */
877         if (la->la_valid & LA_GID) {
878                 if (la->la_gid == (gid_t) -1)
879                         la->la_gid = tmp_la->la_gid;
880                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
881                      ((la->la_gid != tmp_la->la_gid) &&
882                       !lustre_in_group_p(uc, la->la_gid))) &&
883                     !mdd_capable(uc, CFS_CAP_CHOWN))
884                         RETURN(-EPERM);
885
886                 /* Likewise, if the user or group of a non-directory
887                  * has been changed by a non-root user, remove the
888                  * setgid bit UNLESS there is no group execute bit
889                  * (this would be a file marked for mandatory
890                  * locking).  19981026 David C Niemi <niemi@tux.org>
891                  *
892                  * Removed the fsuid check (see the comment above) --
893                  * 19990830 SD. */
894                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
895                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
896                         la->la_mode &= ~S_ISGID;
897                         la->la_valid |= LA_MODE;
898                 }
899         }
900
901         /* For both Size-on-MDS case and truncate case,
902          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
903          * We distinguish them by "flags & MDS_SOM".
904          * For SOM case, it is true, the MAY_WRITE perm has been checked
905          * when open, no need check again. For truncate case, it is false,
906          * the MAY_WRITE perm should be checked here. */
907         if (flags & MDS_SOM) {
908                 /* For the "Size-on-MDS" setattr update, merge coming
909                  * attributes with the set in the inode. BUG 10641 */
910                 if ((la->la_valid & LA_ATIME) &&
911                     (la->la_atime <= tmp_la->la_atime))
912                         la->la_valid &= ~LA_ATIME;
913
914                 /* OST attributes do not have a priority over MDS attributes,
915                  * so drop times if ctime is equal. */
916                 if ((la->la_valid & LA_CTIME) &&
917                     (la->la_ctime <= tmp_la->la_ctime))
918                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
919         } else {
920                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
921                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
922                               (uc->uc_fsuid == tmp_la->la_uid)) &&
923                             !(flags & MDS_PERM_BYPASS)) {
924                                 rc = mdd_permission_internal(env, obj,
925                                                              tmp_la, MAY_WRITE);
926                                 if (rc)
927                                         RETURN(rc);
928                         }
929                 }
930                 if (la->la_valid & LA_CTIME) {
931                         /* The pure setattr, it has the priority over what is
932                          * already set, do not drop it if ctime is equal. */
933                         if (la->la_ctime < tmp_la->la_ctime)
934                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
935                                                   LA_CTIME);
936                 }
937         }
938
939         RETURN(0);
940 }
941
942 /** Store a data change changelog record
943  * If this fails, we must fail the whole transaction; we don't
944  * want the change to commit without the log entry.
945  * \param mdd_obj - mdd_object of change
946  * \param handle - transacion handle
947  */
948 static int mdd_changelog_data_store(const struct lu_env *env,
949                                     struct mdd_device *mdd,
950                                     enum changelog_rec_type type,
951                                     int flags, struct mdd_object *mdd_obj,
952                                     struct thandle *handle)
953 {
954         const struct lu_fid             *tfid = mdo2fid(mdd_obj);
955         struct llog_changelog_rec       *rec;
956         struct lu_buf                   *buf;
957         int                              reclen;
958         int                              rc;
959
960         /* Not recording */
961         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
962                 RETURN(0);
963         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
964                 RETURN(0);
965
966         LASSERT(mdd_obj != NULL);
967         LASSERT(handle != NULL);
968
969         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
970             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
971                 /* Don't need multiple updates in this log */
972                 /* Don't check under lock - no big deal if we get an extra
973                    entry */
974                 RETURN(0);
975         }
976
977         reclen = llog_data_len(sizeof(*rec));
978         buf = mdd_buf_alloc(env, reclen);
979         if (buf->lb_buf == NULL)
980                 RETURN(-ENOMEM);
981         rec = buf->lb_buf;
982
983         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
984         rec->cr.cr_type = (__u32)type;
985         rec->cr.cr_tfid = *tfid;
986         rec->cr.cr_namelen = 0;
987         mdd_obj->mod_cltime = cfs_time_current_64();
988
989         rc = mdd_changelog_store(env, mdd, rec, handle);
990
991         RETURN(rc);
992 }
993
994 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
995                   int flags, struct md_object *obj)
996 {
997         struct thandle *handle;
998         struct mdd_object *mdd_obj = md2mdd_obj(obj);
999         struct mdd_device *mdd = mdo2mdd(obj);
1000         int rc;
1001         ENTRY;
1002
1003         handle = mdd_trans_create(env, mdd);
1004         if (IS_ERR(handle))
1005                 RETURN(PTR_ERR(handle));
1006
1007         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1008         if (rc)
1009                 GOTO(stop, rc);
1010
1011         rc = mdd_trans_start(env, mdd, handle);
1012         if (rc)
1013                 GOTO(stop, rc);
1014
1015         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1016                                       handle);
1017
1018 stop:
1019         mdd_trans_stop(env, mdd, rc, handle);
1020
1021         RETURN(rc);
1022 }
1023
1024 /**
1025  * Save LMA extended attributes with data from \a ma.
1026  *
1027  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1028  * not, LMA EA will be first read from disk, modified and write back.
1029  *
1030  */
1031 /* Precedence for choosing record type when multiple
1032  * attributes change: setattr > mtime > ctime > atime
1033  * (ctime changes when mtime does, plus chmod/chown.
1034  * atime and ctime are independent.) */
1035 static int mdd_attr_set_changelog(const struct lu_env *env,
1036                                   struct md_object *obj, struct thandle *handle,
1037                                   __u64 valid)
1038 {
1039         struct mdd_device *mdd = mdo2mdd(obj);
1040         int bits, type = 0;
1041
1042         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1043         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1044         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1045         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1046         bits = bits & mdd->mdd_cl.mc_mask;
1047         if (bits == 0)
1048                 return 0;
1049
1050         /* The record type is the lowest non-masked set bit */
1051         while (bits && ((bits & 1) == 0)) {
1052                 bits = bits >> 1;
1053                 type++;
1054         }
1055
1056         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1057         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1058                                         md2mdd_obj(obj), handle);
1059 }
1060
1061 static int mdd_declare_attr_set(const struct lu_env *env,
1062                                 struct mdd_device *mdd,
1063                                 struct mdd_object *obj,
1064                                 const struct lu_attr *attr,
1065                                 struct thandle *handle)
1066 {
1067         int rc;
1068
1069         rc = mdo_declare_attr_set(env, obj, attr, handle);
1070         if (rc)
1071                 return rc;
1072
1073 #ifdef CONFIG_FS_POSIX_ACL
1074         if (attr->la_valid & LA_MODE) {
1075                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1076                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1077                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1078                 mdd_read_unlock(env, obj);
1079                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1080                         rc = 0;
1081                 else if (rc < 0)
1082                         return rc;
1083
1084                 if (rc != 0) {
1085                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1086                         rc = mdo_declare_xattr_set(env, obj, buf,
1087                                                    XATTR_NAME_ACL_ACCESS, 0,
1088                                                    handle);
1089                         if (rc)
1090                                 return rc;
1091                 }
1092         }
1093 #endif
1094
1095         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1096         return rc;
1097 }
1098
1099 /* set attr and LOV EA at once, return updated attr */
1100 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1101                  const struct md_attr *ma)
1102 {
1103         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1104         struct mdd_device *mdd = mdo2mdd(obj);
1105         struct thandle *handle;
1106         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1107         const struct lu_attr *la = &ma->ma_attr;
1108         int rc;
1109         ENTRY;
1110
1111         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1112         LASSERT((ma->ma_valid & MA_LOV) == 0);
1113         LASSERT((ma->ma_valid & MA_HSM) == 0);
1114         LASSERT((ma->ma_valid & MA_SOM) == 0);
1115
1116         *la_copy = ma->ma_attr;
1117         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1118         if (rc)
1119                 RETURN(rc);
1120
1121         /* setattr on "close" only change atime, or do nothing */
1122         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1123                 RETURN(0);
1124
1125         handle = mdd_trans_create(env, mdd);
1126         if (IS_ERR(handle))
1127                 RETURN(PTR_ERR(handle));
1128
1129         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1130         if (rc)
1131                 GOTO(stop, rc);
1132
1133         rc = mdd_trans_start(env, mdd, handle);
1134         if (rc)
1135                 GOTO(stop, rc);
1136
1137         /* permission changes may require sync operation */
1138         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1139                 handle->th_sync |= !!mdd->mdd_sync_permission;
1140
1141         if (la->la_valid & (LA_MTIME | LA_CTIME))
1142                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1143                        la->la_mtime, la->la_ctime);
1144
1145         if (la_copy->la_valid & LA_FLAGS) {
1146                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1147                 if (rc == 0)
1148                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1149         } else if (la_copy->la_valid) {            /* setattr */
1150                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1151         }
1152
1153         if (rc == 0)
1154                 rc = mdd_attr_set_changelog(env, obj, handle,
1155                                             la->la_valid);
1156 stop:
1157         mdd_trans_stop(env, mdd, rc, handle);
1158         RETURN(rc);
1159 }
1160
1161 static int mdd_xattr_sanity_check(const struct lu_env *env,
1162                                   struct mdd_object *obj)
1163 {
1164         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1165         struct lu_ucred *uc     = lu_ucred_assert(env);
1166         int rc;
1167         ENTRY;
1168
1169         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1170                 RETURN(-EPERM);
1171
1172         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1173         if (rc)
1174                 RETURN(rc);
1175
1176         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1177             !mdd_capable(uc, CFS_CAP_FOWNER))
1178                 RETURN(-EPERM);
1179
1180         RETURN(rc);
1181 }
1182
1183 static int mdd_declare_xattr_set(const struct lu_env *env,
1184                                  struct mdd_device *mdd,
1185                                  struct mdd_object *obj,
1186                                  const struct lu_buf *buf,
1187                                  const char *name,
1188                                  int fl, struct thandle *handle)
1189 {
1190         int     rc;
1191
1192         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1193         if (rc)
1194                 return rc;
1195
1196         /* Only record user xattr changes */
1197         if ((strncmp("user.", name, 5) == 0))
1198                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1199
1200         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1201         return rc;
1202 }
1203
1204 /**
1205  * The caller should guarantee to update the object ctime
1206  * after xattr_set if needed.
1207  */
1208 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1209                          const struct lu_buf *buf, const char *name,
1210                          int fl)
1211 {
1212         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1213         struct mdd_device       *mdd = mdo2mdd(obj);
1214         struct thandle          *handle;
1215         int                      rc;
1216         ENTRY;
1217
1218         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1219                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1220                 RETURN(rc);
1221         }
1222
1223         rc = mdd_xattr_sanity_check(env, mdd_obj);
1224         if (rc)
1225                 RETURN(rc);
1226
1227         handle = mdd_trans_create(env, mdd);
1228         if (IS_ERR(handle))
1229                 RETURN(PTR_ERR(handle));
1230
1231         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
1232         if (rc)
1233                 GOTO(stop, rc);
1234
1235         rc = mdd_trans_start(env, mdd, handle);
1236         if (rc)
1237                 GOTO(stop, rc);
1238
1239         /* security-replated changes may require sync */
1240         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1241                 handle->th_sync |= !!mdd->mdd_sync_permission;
1242
1243         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1244         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1245                            mdd_object_capa(env, mdd_obj));
1246         mdd_write_unlock(env, mdd_obj);
1247         if (rc)
1248                 GOTO(stop, rc);
1249
1250         /* Only record system & user xattr changes */
1251         if (strncmp(XATTR_USER_PREFIX, name,
1252                         sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1253             strncmp(POSIX_ACL_XATTR_ACCESS, name,
1254                         sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1255             strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1256                         sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1257                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1258                                               handle);
1259
1260 stop:
1261         mdd_trans_stop(env, mdd, rc, handle);
1262
1263         RETURN(rc);
1264 }
1265
1266 static int mdd_declare_xattr_del(const struct lu_env *env,
1267                                  struct mdd_device *mdd,
1268                                  struct mdd_object *obj,
1269                                  const char *name,
1270                                  struct thandle *handle)
1271 {
1272         int rc;
1273
1274         rc = mdo_declare_xattr_del(env, obj, name, handle);
1275         if (rc)
1276                 return rc;
1277
1278         /* Only record user xattr changes */
1279         if ((strncmp("user.", name, 5) == 0))
1280                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1281
1282         return rc;
1283 }
1284
1285 /**
1286  * The caller should guarantee to update the object ctime
1287  * after xattr_set if needed.
1288  */
1289 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1290                   const char *name)
1291 {
1292         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1293         struct mdd_device *mdd = mdo2mdd(obj);
1294         struct thandle *handle;
1295         int  rc;
1296         ENTRY;
1297
1298         rc = mdd_xattr_sanity_check(env, mdd_obj);
1299         if (rc)
1300                 RETURN(rc);
1301
1302         handle = mdd_trans_create(env, mdd);
1303         if (IS_ERR(handle))
1304                 RETURN(PTR_ERR(handle));
1305
1306         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1307         if (rc)
1308                 GOTO(stop, rc);
1309
1310         rc = mdd_trans_start(env, mdd, handle);
1311         if (rc)
1312                 GOTO(stop, rc);
1313
1314         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1315         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1316                            mdd_object_capa(env, mdd_obj));
1317         mdd_write_unlock(env, mdd_obj);
1318         if (rc)
1319                 GOTO(stop, rc);
1320
1321         /* Only record system & user xattr changes */
1322         if (strncmp(XATTR_USER_PREFIX, name,
1323                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1324                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1325                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1326                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1327                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1328                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1329                                               handle);
1330
1331 stop:
1332         mdd_trans_stop(env, mdd, rc, handle);
1333
1334         RETURN(rc);
1335 }
1336
1337 /*
1338  * read lov EA of an object
1339  * return the lov EA in an allocated lu_buf
1340  */
1341 static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
1342                                      struct mdd_object *obj)
1343 {
1344         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1345         struct lu_buf   *lmm_buf = NULL;
1346         int              rc, sz;
1347         ENTRY;
1348
1349 repeat:
1350         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
1351                            mdd_object_capa(env, obj));
1352         if (rc < 0)
1353                 GOTO(out, rc);
1354
1355         if (rc == 0)
1356                 GOTO(out, rc = -ENODATA);
1357
1358         sz = rc;
1359         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1360                 /* mti_big_buf was not allocated, so we have to
1361                  * allocate it based on the ea size */
1362                 buf = mdd_buf_alloc(env, sz);
1363                 if (buf->lb_buf == NULL)
1364                         GOTO(out, rc = -ENOMEM);
1365                 goto repeat;
1366         }
1367
1368         OBD_ALLOC_PTR(lmm_buf);
1369         if (!lmm_buf)
1370                 GOTO(out, rc = -ENOMEM);
1371
1372         OBD_ALLOC(lmm_buf->lb_buf, sz);
1373         if (!lmm_buf->lb_buf)
1374                 GOTO(free, rc = -ENOMEM);
1375
1376         memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
1377         lmm_buf->lb_len = sz;
1378
1379         GOTO(out, rc = 0);
1380
1381 free:
1382         if (lmm_buf)
1383                 OBD_FREE_PTR(lmm_buf);
1384 out:
1385         if (rc)
1386                 return ERR_PTR(rc);
1387         return lmm_buf;
1388 }
1389
1390
1391 /*
1392  *  check if layout swapping between 2 objects is allowed
1393  *  the rules are:
1394  *  - same type of objects
1395  *  - same owner/group (so quotas are still valid)
1396  */
1397 static int mdd_layout_swap_allowed(const struct lu_env *env,
1398                                    struct mdd_object *o1,
1399                                    struct mdd_object *o2)
1400 {
1401         const struct lu_fid     *fid1, *fid2;
1402         __u32                    uid, gid;
1403         struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
1404         int                      rc;
1405         ENTRY;
1406
1407         fid1 = mdo2fid(o1);
1408         fid2 = mdo2fid(o2);
1409
1410         if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
1411             (mdd_object_type(o1) != mdd_object_type(o2)))
1412                 RETURN(-EPERM);
1413
1414         tmp_la->la_valid = 0;
1415         rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
1416         if (rc)
1417                 RETURN(rc);
1418         uid = tmp_la->la_uid;
1419         gid = tmp_la->la_gid;
1420
1421         tmp_la->la_valid = 0;
1422         rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
1423         if (rc)
1424                 RETURN(rc);
1425
1426         if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
1427                 RETURN(-EPERM);
1428
1429         RETURN(0);
1430 }
1431
1432 /**
1433  * swap layouts between 2 lustre objects
1434  */
1435 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1436                             struct md_object *obj2, __u64 flags)
1437 {
1438         struct mdd_object       *o1, *o2, *fst_o, *snd_o;
1439         struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
1440         struct lu_buf           *fst_buf, *snd_buf;
1441         struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
1442         struct thandle          *handle;
1443         struct mdd_device       *mdd = mdo2mdd(obj1);
1444         int                      rc;
1445         __u16                    fst_gen, snd_gen;
1446         ENTRY;
1447
1448         /* we have to sort the 2 obj, so locking will always
1449          * be in the same order, even in case of 2 concurrent swaps */
1450         rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
1451                        mdo2fid(md2mdd_obj(obj2)));
1452         /* same fid ? */
1453         if (rc == 0)
1454                 RETURN(-EPERM);
1455
1456         if (rc > 0) {
1457                 o1 = md2mdd_obj(obj1);
1458                 o2 = md2mdd_obj(obj2);
1459         } else {
1460                 o1 = md2mdd_obj(obj2);
1461                 o2 = md2mdd_obj(obj1);
1462         }
1463
1464         /* check if layout swapping is allowed */
1465         rc = mdd_layout_swap_allowed(env, o1, o2);
1466         if (rc)
1467                 RETURN(rc);
1468
1469         handle = mdd_trans_create(env, mdd);
1470         if (IS_ERR(handle))
1471                 RETURN(PTR_ERR(handle));
1472
1473         /* objects are already sorted */
1474         mdd_write_lock(env, o1, MOR_TGT_CHILD);
1475         mdd_write_lock(env, o2, MOR_TGT_CHILD);
1476
1477         lmm1_buf = mdd_get_lov_ea(env, o1);
1478         if (IS_ERR(lmm1_buf)) {
1479                 rc = PTR_ERR(lmm1_buf);
1480                 lmm1_buf = NULL;
1481                 if (rc != -ENODATA)
1482                         GOTO(unlock, rc);
1483         }
1484
1485         lmm2_buf = mdd_get_lov_ea(env, o2);
1486         if (IS_ERR(lmm2_buf)) {
1487                 rc = PTR_ERR(lmm2_buf);
1488                 lmm2_buf = NULL;
1489                 if (rc != -ENODATA)
1490                         GOTO(unlock, rc);
1491         }
1492
1493         /* swapping 2 non existant layouts is a success */
1494         if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
1495                 GOTO(unlock, rc = 0);
1496
1497         /* to help inode migration between MDT, it is better to
1498          * start by the no layout file (if one), so we order the swap */
1499         if (lmm1_buf == NULL) {
1500                 fst_o = o1;
1501                 fst_buf = lmm1_buf;
1502                 snd_o = o2;
1503                 snd_buf = lmm2_buf;
1504         } else {
1505                 fst_o = o2;
1506                 fst_buf = lmm2_buf;
1507                 snd_o = o1;
1508                 snd_buf = lmm1_buf;
1509         }
1510
1511         /* lmm and generation layout initialization */
1512         if (fst_buf) {
1513                 fst_lmm = fst_buf->lb_buf;
1514                 fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
1515         } else {
1516                 fst_lmm = NULL;
1517                 fst_gen = 0;
1518         }
1519
1520         if (snd_buf) {
1521                 snd_lmm = snd_buf->lb_buf;
1522                 snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
1523         } else {
1524                 snd_lmm = NULL;
1525                 snd_gen = 0;
1526         }
1527
1528         /* save the orignal lmm common header of first file
1529          * to be able to roll back */
1530         OBD_ALLOC_PTR(old_fst_lmm);
1531         if (old_fst_lmm == NULL)
1532                 GOTO(unlock, rc = -ENOMEM);
1533
1534         memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
1535
1536         /* increase the generation layout numbers */
1537         snd_gen++;
1538         fst_gen++;
1539
1540         /* set the file specific informations in lmm */
1541         if (fst_lmm) {
1542                 fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
1543                 fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
1544                 fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
1545         }
1546
1547         if (snd_lmm) {
1548                 snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
1549                 snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1550                 snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1551         }
1552
1553         /* prepare transaction */
1554         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
1555                                    LU_XATTR_REPLACE, handle);
1556         if (rc)
1557                 GOTO(stop, rc);
1558
1559         rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
1560                                    LU_XATTR_REPLACE, handle);
1561         if (rc)
1562                 GOTO(stop, rc);
1563
1564         rc = mdd_trans_start(env, mdd, handle);
1565         if (rc)
1566                 GOTO(stop, rc);
1567
1568         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
1569                            LU_XATTR_REPLACE, handle,
1570                            mdd_object_capa(env, fst_o));
1571         if (rc)
1572                 GOTO(stop, rc);
1573
1574         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
1575                            LU_XATTR_REPLACE, handle,
1576                            mdd_object_capa(env, snd_o));
1577         if (rc) {
1578                 int     rc2;
1579
1580                 /* failure on second file, but first was done, so we have
1581                  * to roll back first */
1582                 /* restore object_id, object_seq and generation number
1583                  * on first file */
1584                 if (fst_lmm) {
1585                         fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1586                         fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1587                         fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
1588                 }
1589
1590                 rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
1591                                     LU_XATTR_REPLACE, handle,
1592                                     mdd_object_capa(env, fst_o));
1593                 if (rc2) {
1594                         /* very bad day */
1595                         CERROR("%s: unable to roll back after swap layouts"
1596                                " failure between "DFID" and "DFID
1597                                " rc2 = %d rc = %d)\n",
1598                                mdd2obd_dev(mdd)->obd_name,
1599                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
1600                                rc2, rc);
1601                         /* a solution to avoid journal commit is to panic,
1602                          * but it has strong consequences so we use LBUG to
1603                          * allow sysdamin to choose to panic or not
1604                          */
1605                         LBUG();
1606                 }
1607                 GOTO(stop, rc);
1608         }
1609         EXIT;
1610
1611 stop:
1612         mdd_trans_stop(env, mdd, rc, handle);
1613 unlock:
1614         mdd_write_unlock(env, o2);
1615         mdd_write_unlock(env, o1);
1616
1617         if (lmm1_buf && lmm1_buf->lb_buf)
1618                 OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
1619         if (lmm1_buf)
1620                 OBD_FREE_PTR(lmm1_buf);
1621
1622         if (lmm2_buf && lmm2_buf->lb_buf)
1623                 OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
1624         if (lmm2_buf)
1625                 OBD_FREE_PTR(lmm2_buf);
1626
1627         if (old_fst_lmm)
1628                 OBD_FREE_PTR(old_fst_lmm);
1629
1630         return rc;
1631 }
1632
1633 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1634                 struct mdd_object *child, struct lu_attr *attr)
1635 {
1636         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1637         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1638         struct dt_object *nc = mdd_object_child(child);
1639
1640         /* @hint will be initialized by underlying device. */
1641         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1642 }
1643
1644 /*
1645  * do NOT or the MAY_*'s, you'll get the weakest
1646  */
1647 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1648 {
1649         int res = 0;
1650
1651         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1652          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1653          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1654          * owner can write to a file even if it is marked readonly to hide
1655          * its brokenness. (bug 5781) */
1656         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1657                 struct lu_ucred *uc = lu_ucred_check(env);
1658
1659                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1660                         return 0;
1661         }
1662
1663         if (flags & FMODE_READ)
1664                 res |= MAY_READ;
1665         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1666                 res |= MAY_WRITE;
1667         if (flags & MDS_FMODE_EXEC)
1668                 res = MAY_EXEC;
1669         return res;
1670 }
1671
1672 static int mdd_open_sanity_check(const struct lu_env *env,
1673                                  struct mdd_object *obj, int flag)
1674 {
1675         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1676         int mode, rc;
1677         ENTRY;
1678
1679         /* EEXIST check */
1680         if (mdd_is_dead_obj(obj))
1681                 RETURN(-ENOENT);
1682
1683         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1684         if (rc)
1685                RETURN(rc);
1686
1687         if (S_ISLNK(tmp_la->la_mode))
1688                 RETURN(-ELOOP);
1689
1690         mode = accmode(env, tmp_la, flag);
1691
1692         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1693                 RETURN(-EISDIR);
1694
1695         if (!(flag & MDS_OPEN_CREATED)) {
1696                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1697                 if (rc)
1698                         RETURN(rc);
1699         }
1700
1701         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1702             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1703                 flag &= ~MDS_OPEN_TRUNC;
1704
1705         /* For writing append-only file must open it with append mode. */
1706         if (mdd_is_append(obj)) {
1707                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1708                         RETURN(-EPERM);
1709                 if (flag & MDS_OPEN_TRUNC)
1710                         RETURN(-EPERM);
1711         }
1712
1713 #if 0
1714         /*
1715          * Now, flag -- O_NOATIME does not be packed by client.
1716          */
1717         if (flag & O_NOATIME) {
1718                 struct lu_ucred *uc = lu_ucred(env);
1719
1720                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1721                            (uc->uc_valid == UCRED_NEW)) &&
1722                     (uc->uc_fsuid != tmp_la->la_uid) &&
1723                     !mdd_capable(uc, CFS_CAP_FOWNER))
1724                         RETURN(-EPERM);
1725         }
1726 #endif
1727
1728         RETURN(0);
1729 }
1730
1731 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1732                     int flags)
1733 {
1734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1735         int rc = 0;
1736
1737         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1738
1739         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1740         if (rc == 0)
1741                 mdd_obj->mod_count++;
1742
1743         mdd_write_unlock(env, mdd_obj);
1744         return rc;
1745 }
1746
1747 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1748                             struct md_attr *ma, struct thandle *handle)
1749 {
1750         return mdo_declare_destroy(env, obj, handle);
1751 }
1752
1753 /* return md_attr back,
1754  * if it is last unlink then return lov ea + llog cookie*/
1755 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1756                     struct md_attr *ma, struct thandle *handle)
1757 {
1758         int rc;
1759         ENTRY;
1760
1761         rc = mdo_destroy(env, obj, handle);
1762
1763         RETURN(rc);
1764 }
1765
1766 static int mdd_declare_close(const struct lu_env *env,
1767                              struct mdd_object *obj,
1768                              struct md_attr *ma,
1769                              struct thandle *handle)
1770 {
1771         int rc;
1772
1773         rc = orph_declare_index_delete(env, obj, handle);
1774         if (rc)
1775                 return rc;
1776
1777         return mdo_declare_destroy(env, obj, handle);
1778 }
1779
1780 /*
1781  * No permission check is needed.
1782  */
1783 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1784                      struct md_attr *ma, int mode)
1785 {
1786         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1787         struct mdd_device *mdd = mdo2mdd(obj);
1788         struct thandle    *handle = NULL;
1789         int rc, is_orphan = 0;
1790         ENTRY;
1791
1792         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1793                 mdd_obj->mod_count--;
1794
1795                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1796                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1797                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1798                 RETURN(0);
1799         }
1800
1801         /* check without any lock */
1802         if (mdd_obj->mod_count == 1 &&
1803             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1804  again:
1805                 handle = mdd_trans_create(env, mdo2mdd(obj));
1806                 if (IS_ERR(handle))
1807                         RETURN(PTR_ERR(handle));
1808
1809                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1810                 if (rc)
1811                         GOTO(stop, rc);
1812
1813                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1814                 if (rc)
1815                         GOTO(stop, rc);
1816
1817                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1818                 if (rc)
1819                         GOTO(stop, rc);
1820         }
1821
1822         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1823         if (handle == NULL && mdd_obj->mod_count == 1 &&
1824             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1825                 mdd_write_unlock(env, mdd_obj);
1826                 goto again;
1827         }
1828
1829         /* release open count */
1830         mdd_obj->mod_count --;
1831
1832         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1833                 /* remove link to object from orphan index */
1834                 LASSERT(handle != NULL);
1835                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1836                 if (rc == 0) {
1837                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1838                                "list, OSS objects to be destroyed.\n",
1839                                PFID(mdd_object_fid(mdd_obj)));
1840                         is_orphan = 1;
1841                 } else {
1842                         CERROR("Object "DFID" can not be deleted from orphan "
1843                                 "list, maybe cause OST objects can not be "
1844                                 "destroyed (err: %d).\n",
1845                                 PFID(mdd_object_fid(mdd_obj)), rc);
1846                         /* If object was not deleted from orphan list, do not
1847                          * destroy OSS objects, which will be done when next
1848                          * recovery. */
1849                         GOTO(out, rc);
1850                 }
1851         }
1852
1853         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1854                         mdd_object_capa(env, mdd_obj));
1855         /* Object maybe not in orphan list originally, it is rare case for
1856          * mdd_finish_unlink() failure. */
1857         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
1858                 if (handle == NULL) {
1859                         handle = mdd_trans_create(env, mdo2mdd(obj));
1860                         if (IS_ERR(handle))
1861                                 GOTO(out, rc = PTR_ERR(handle));
1862
1863                         rc = mdo_declare_destroy(env, mdd_obj, handle);
1864                         if (rc)
1865                                 GOTO(out, rc);
1866
1867                         rc = mdd_declare_changelog_store(env, mdd,
1868                                         NULL, handle);
1869                         if (rc)
1870                                 GOTO(stop, rc);
1871
1872                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1873                         if (rc)
1874                                 GOTO(out, rc);
1875                 }
1876
1877                 rc = mdo_destroy(env, mdd_obj, handle);
1878
1879                 if (rc != 0)
1880                         CERROR("Error when prepare to delete Object "DFID" , "
1881                                "which will cause OST objects can not be "
1882                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1883         }
1884         EXIT;
1885
1886 out:
1887
1888         mdd_write_unlock(env, mdd_obj);
1889
1890         if (rc == 0 &&
1891             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1892             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1893                 if (handle == NULL) {
1894                         handle = mdd_trans_create(env, mdo2mdd(obj));
1895                         if (IS_ERR(handle))
1896                                 GOTO(stop, rc = IS_ERR(handle));
1897
1898                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1899                                                          handle);
1900                         if (rc)
1901                                 GOTO(stop, rc);
1902
1903                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1904                         if (rc)
1905                                 GOTO(stop, rc);
1906                 }
1907
1908                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1909                                          mdd_obj, handle);
1910         }
1911
1912 stop:
1913         if (handle != NULL)
1914                 mdd_trans_stop(env, mdd, rc, handle);
1915         return rc;
1916 }
1917
1918 /*
1919  * Permission check is done when open,
1920  * no need check again.
1921  */
1922 static int mdd_readpage_sanity_check(const struct lu_env *env,
1923                                      struct mdd_object *obj)
1924 {
1925         struct dt_object *next = mdd_object_child(obj);
1926         int rc;
1927         ENTRY;
1928
1929         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1930                 rc = 0;
1931         else
1932                 rc = -ENOTDIR;
1933
1934         RETURN(rc);
1935 }
1936
1937 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
1938                               int nob, const struct dt_it_ops *iops,
1939                               struct dt_it *it, __u32 attr, void *arg)
1940 {
1941         struct lu_dirpage       *dp = &lp->lp_dir;
1942         void                    *area = dp;
1943         int                      result;
1944         __u64                    hash = 0;
1945         struct lu_dirent        *ent;
1946         struct lu_dirent        *last = NULL;
1947         int                      first = 1;
1948
1949         memset(area, 0, sizeof (*dp));
1950         area += sizeof (*dp);
1951         nob  -= sizeof (*dp);
1952
1953         ent  = area;
1954         do {
1955                 int    len;
1956                 int    recsize;
1957
1958                 len = iops->key_size(env, it);
1959
1960                 /* IAM iterator can return record with zero len. */
1961                 if (len == 0)
1962                         goto next;
1963
1964                 hash = iops->store(env, it);
1965                 if (unlikely(first)) {
1966                         first = 0;
1967                         dp->ldp_hash_start = cpu_to_le64(hash);
1968                 }
1969
1970                 /* calculate max space required for lu_dirent */
1971                 recsize = lu_dirent_calc_size(len, attr);
1972
1973                 if (nob >= recsize) {
1974                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
1975                         if (result == -ESTALE)
1976                                 goto next;
1977                         if (result != 0)
1978                                 goto out;
1979
1980                         /* osd might not able to pack all attributes,
1981                          * so recheck rec length */
1982                         recsize = le16_to_cpu(ent->lde_reclen);
1983                 } else {
1984                         result = (last != NULL) ? 0 :-EINVAL;
1985                         goto out;
1986                 }
1987                 last = ent;
1988                 ent = (void *)ent + recsize;
1989                 nob -= recsize;
1990
1991 next:
1992                 result = iops->next(env, it);
1993                 if (result == -ESTALE)
1994                         goto next;
1995         } while (result == 0);
1996
1997 out:
1998         dp->ldp_hash_end = cpu_to_le64(hash);
1999         if (last != NULL) {
2000                 if (last->lde_hash == dp->ldp_hash_end)
2001                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2002                 last->lde_reclen = 0; /* end mark */
2003         }
2004         if (result > 0)
2005                 /* end of directory */
2006                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2007         if (result < 0)
2008                 CWARN("build page failed: %d!\n", result);
2009         return result;
2010 }
2011
2012 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2013                  const struct lu_rdpg *rdpg)
2014 {
2015         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2016         int rc;
2017         ENTRY;
2018
2019         if (mdd_object_exists(mdd_obj) == 0) {
2020                 CERROR("%s: object "DFID" not found: rc = -2\n",
2021                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2022                 return -ENOENT;
2023         }
2024
2025         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2026         rc = mdd_readpage_sanity_check(env, mdd_obj);
2027         if (rc)
2028                 GOTO(out_unlock, rc);
2029
2030         if (mdd_is_dead_obj(mdd_obj)) {
2031                 struct page *pg;
2032                 struct lu_dirpage *dp;
2033
2034                 /*
2035                  * According to POSIX, please do not return any entry to client:
2036                  * even dot and dotdot should not be returned.
2037                  */
2038                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2039                        PFID(mdd_object_fid(mdd_obj)));
2040
2041                 if (rdpg->rp_count <= 0)
2042                         GOTO(out_unlock, rc = -EFAULT);
2043                 LASSERT(rdpg->rp_pages != NULL);
2044
2045                 pg = rdpg->rp_pages[0];
2046                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2047                 memset(dp, 0 , sizeof(struct lu_dirpage));
2048                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2049                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2050                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2051                 cfs_kunmap(pg);
2052                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2053         }
2054
2055         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2056                            mdd_dir_page_build, NULL);
2057         if (rc >= 0) {
2058                 struct lu_dirpage       *dp;
2059
2060                 dp = cfs_kmap(rdpg->rp_pages[0]);
2061                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2062                 if (rc == 0) {
2063                         /*
2064                          * No pages were processed, mark this for first page
2065                          * and send back.
2066                          */
2067                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2068                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2069                 }
2070                 cfs_kunmap(rdpg->rp_pages[0]);
2071         }
2072
2073         GOTO(out_unlock, rc);
2074 out_unlock:
2075         mdd_read_unlock(env, mdd_obj);
2076         return rc;
2077 }
2078
2079 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2080 {
2081         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2082
2083         if (mdd_object_exists(mdd_obj) == 0) {
2084                 CERROR("%s: object "DFID" not found: rc = -2\n",
2085                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2086                 return -ENOENT;
2087         }
2088         return dt_object_sync(env, mdd_object_child(mdd_obj));
2089 }
2090
2091 const struct md_object_operations mdd_obj_ops = {
2092         .moo_permission         = mdd_permission,
2093         .moo_attr_get           = mdd_attr_get,
2094         .moo_attr_set           = mdd_attr_set,
2095         .moo_xattr_get          = mdd_xattr_get,
2096         .moo_xattr_set          = mdd_xattr_set,
2097         .moo_xattr_list         = mdd_xattr_list,
2098         .moo_xattr_del          = mdd_xattr_del,
2099         .moo_swap_layouts       = mdd_swap_layouts,
2100         .moo_open               = mdd_open,
2101         .moo_close              = mdd_close,
2102         .moo_readpage           = mdd_readpage,
2103         .moo_readlink           = mdd_readlink,
2104         .moo_changelog          = mdd_changelog,
2105         .moo_capa_get           = mdd_capa_get,
2106         .moo_object_sync        = mdd_object_sync,
2107         .moo_path               = mdd_path,
2108 };