Whamcloud - gitweb
5dbb9ef614fc5e4c8775cddc9dbe8e247bec9435
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 const struct lu_name *mdd_name_get_const(const struct lu_env *env,
111                                          const void *area, ssize_t len)
112 {
113         struct lu_name *lname;
114
115         lname = &mdd_env_info(env)->mti_name;
116         lname->ln_name = area;
117         lname->ln_namelen = len;
118         return lname;
119 }
120
121 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 {
123         struct lu_buf *buf;
124
125         buf = &mdd_env_info(env)->mti_buf;
126         buf->lb_buf = area;
127         buf->lb_len = len;
128         return buf;
129 }
130
131 void mdd_buf_put(struct lu_buf *buf)
132 {
133         if (buf == NULL || buf->lb_buf == NULL)
134                 return;
135         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
136         *buf = LU_BUF_NULL;
137 }
138
139 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
140                                        const void *area, ssize_t len)
141 {
142         struct lu_buf *buf;
143
144         buf = &mdd_env_info(env)->mti_buf;
145         buf->lb_buf = (void *)area;
146         buf->lb_len = len;
147         return buf;
148 }
149
150 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
151 {
152         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
153
154         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
155                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156                 *buf = LU_BUF_NULL;
157         }
158         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
159                 buf->lb_len = len;
160                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
161                 if (buf->lb_buf == NULL)
162                         *buf = LU_BUF_NULL;
163         }
164         return buf;
165 }
166
167 /** Increase the size of the \a mti_big_buf.
168  * preserves old data in buffer
169  * old buffer remains unchanged on error
170  * \retval 0 or -ENOMEM
171  */
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
173 {
174         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175         struct lu_buf buf;
176
177         LASSERT(len >= oldbuf->lb_len);
178         OBD_ALLOC_LARGE(buf.lb_buf, len);
179
180         if (buf.lb_buf == NULL)
181                 return -ENOMEM;
182
183         buf.lb_len = len;
184         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
185
186         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
187
188         memcpy(oldbuf, &buf, sizeof(buf));
189
190         return 0;
191 }
192
193 struct lu_object *mdd_object_alloc(const struct lu_env *env,
194                                    const struct lu_object_header *hdr,
195                                    struct lu_device *d)
196 {
197         struct mdd_object *mdd_obj;
198
199         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
200         if (mdd_obj != NULL) {
201                 struct lu_object *o;
202
203                 o = mdd2lu_obj(mdd_obj);
204                 lu_object_init(o, NULL, d);
205                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
206                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
207                 mdd_obj->mod_count = 0;
208                 o->lo_ops = &mdd_lu_obj_ops;
209                 return o;
210         } else {
211                 return NULL;
212         }
213 }
214
215 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
216                            const struct lu_object_conf *unused)
217 {
218         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
219         struct mdd_object *mdd_obj = lu2mdd_obj(o);
220         struct lu_object  *below;
221         struct lu_device  *under;
222         ENTRY;
223
224         mdd_obj->mod_cltime = 0;
225         under = &d->mdd_child->dd_lu_dev;
226         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
227         mdd_pdlock_init(mdd_obj);
228         if (IS_ERR(below))
229                 RETURN(PTR_ERR(below));
230
231         lu_object_add(o, below);
232
233         RETURN(0);
234 }
235
236 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
237 {
238         if (lu_object_exists(o))
239                 return mdd_get_flags(env, lu2mdd_obj(o));
240         else
241                 return 0;
242 }
243
244 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
245 {
246         struct mdd_object *mdd = lu2mdd_obj(o);
247
248         lu_object_fini(o);
249         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
250 }
251
252 static int mdd_object_print(const struct lu_env *env, void *cookie,
253                             lu_printer_t p, const struct lu_object *o)
254 {
255         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
256         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
257                     "valid=%x, cltime="LPU64", flags=%lx)",
258                     mdd, mdd->mod_count, mdd->mod_valid,
259                     mdd->mod_cltime, mdd->mod_flags);
260 }
261
262 static const struct lu_object_operations mdd_lu_obj_ops = {
263         .loo_object_init    = mdd_object_init,
264         .loo_object_start   = mdd_object_start,
265         .loo_object_free    = mdd_object_free,
266         .loo_object_print   = mdd_object_print,
267 };
268
269 struct mdd_object *mdd_object_find(const struct lu_env *env,
270                                    struct mdd_device *d,
271                                    const struct lu_fid *f)
272 {
273         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
274 }
275
276 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
277                         const char *path, struct lu_fid *fid)
278 {
279         struct lu_buf *buf;
280         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
281         struct mdd_object *obj;
282         struct lu_name *lname = &mdd_env_info(env)->mti_name;
283         char *name;
284         int rc = 0;
285         ENTRY;
286
287         /* temp buffer for path element */
288         buf = mdd_buf_alloc(env, PATH_MAX);
289         if (buf->lb_buf == NULL)
290                 RETURN(-ENOMEM);
291
292         lname->ln_name = name = buf->lb_buf;
293         lname->ln_namelen = 0;
294         *f = mdd->mdd_root_fid;
295
296         while(1) {
297                 while (*path == '/')
298                         path++;
299                 if (*path == '\0')
300                         break;
301                 while (*path != '/' && *path != '\0') {
302                         *name = *path;
303                         path++;
304                         name++;
305                         lname->ln_namelen++;
306                 }
307
308                 *name = '\0';
309                 /* find obj corresponding to fid */
310                 obj = mdd_object_find(env, mdd, f);
311                 if (obj == NULL)
312                         GOTO(out, rc = -EREMOTE);
313                 if (IS_ERR(obj))
314                         GOTO(out, rc = PTR_ERR(obj));
315                 /* get child fid from parent and name */
316                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
317                 mdd_object_put(env, obj);
318                 if (rc)
319                         break;
320
321                 name = buf->lb_buf;
322                 lname->ln_namelen = 0;
323         }
324
325         if (!rc)
326                 *fid = *f;
327 out:
328         RETURN(rc);
329 }
330
331 /** The maximum depth that fid2path() will search.
332  * This is limited only because we want to store the fids for
333  * historical path lookup purposes.
334  */
335 #define MAX_PATH_DEPTH 100
336
337 /** mdd_path() lookup structure. */
338 struct path_lookup_info {
339         __u64                pli_recno;        /**< history point */
340         __u64                pli_currec;       /**< current record */
341         struct lu_fid        pli_fid;
342         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
343         struct mdd_object   *pli_mdd_obj;
344         char                *pli_path;         /**< full path */
345         int                  pli_pathlen;
346         int                  pli_linkno;       /**< which hardlink to follow */
347         int                  pli_fidcount;     /**< number of \a pli_fids */
348 };
349
350 static int mdd_path_current(const struct lu_env *env,
351                             struct path_lookup_info *pli)
352 {
353         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
354         struct mdd_object *mdd_obj;
355         struct lu_buf     *buf = NULL;
356         struct link_ea_header *leh;
357         struct link_ea_entry  *lee;
358         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
359         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
360         char *ptr;
361         int reclen;
362         int rc;
363         ENTRY;
364
365         ptr = pli->pli_path + pli->pli_pathlen - 1;
366         *ptr = 0;
367         --ptr;
368         pli->pli_fidcount = 0;
369         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
370
371         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
372                 mdd_obj = mdd_object_find(env, mdd,
373                                           &pli->pli_fids[pli->pli_fidcount]);
374                 if (mdd_obj == NULL)
375                         GOTO(out, rc = -EREMOTE);
376                 if (IS_ERR(mdd_obj))
377                         GOTO(out, rc = PTR_ERR(mdd_obj));
378                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
379                 if (rc <= 0) {
380                         mdd_object_put(env, mdd_obj);
381                         if (rc == -1)
382                                 rc = -EREMOTE;
383                         else if (rc == 0)
384                                 /* Do I need to error out here? */
385                                 rc = -ENOENT;
386                         GOTO(out, rc);
387                 }
388
389                 /* Get parent fid and object name */
390                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
391                 buf = mdd_links_get(env, mdd_obj);
392                 mdd_read_unlock(env, mdd_obj);
393                 mdd_object_put(env, mdd_obj);
394                 if (IS_ERR(buf))
395                         GOTO(out, rc = PTR_ERR(buf));
396
397                 leh = buf->lb_buf;
398                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
399                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
400
401                 /* If set, use link #linkno for path lookup, otherwise use
402                    link #0.  Only do this for the final path element. */
403                 if ((pli->pli_fidcount == 0) &&
404                     (pli->pli_linkno < leh->leh_reccount)) {
405                         int count;
406                         for (count = 0; count < pli->pli_linkno; count++) {
407                                 lee = (struct link_ea_entry *)
408                                      ((char *)lee + reclen);
409                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
410                         }
411                         if (pli->pli_linkno < leh->leh_reccount - 1)
412                                 /* indicate to user there are more links */
413                                 pli->pli_linkno++;
414                 }
415
416                 /* Pack the name in the end of the buffer */
417                 ptr -= tmpname->ln_namelen;
418                 if (ptr - 1 <= pli->pli_path)
419                         GOTO(out, rc = -EOVERFLOW);
420                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
421                 *(--ptr) = '/';
422
423                 /* Store the parent fid for historic lookup */
424                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
425                         GOTO(out, rc = -EOVERFLOW);
426                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
427         }
428
429         /* Verify that our path hasn't changed since we started the lookup.
430            Record the current index, and verify the path resolves to the
431            same fid. If it does, then the path is correct as of this index. */
432         spin_lock(&mdd->mdd_cl.mc_lock);
433         pli->pli_currec = mdd->mdd_cl.mc_index;
434         spin_unlock(&mdd->mdd_cl.mc_lock);
435         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
436         if (rc) {
437                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
438                 GOTO (out, rc = -EAGAIN);
439         }
440         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
441                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
442                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
443                        PFID(&pli->pli_fid));
444                 GOTO(out, rc = -EAGAIN);
445         }
446         ptr++; /* skip leading / */
447         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
448
449         EXIT;
450 out:
451         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
452                 /* if we vmalloced a large buffer drop it */
453                 mdd_buf_put(buf);
454
455         return rc;
456 }
457
458 static int mdd_path_historic(const struct lu_env *env,
459                              struct path_lookup_info *pli)
460 {
461         return 0;
462 }
463
464 /* Returns the full path to this fid, as of changelog record recno. */
465 static int mdd_path(const struct lu_env *env, struct md_object *obj,
466                     char *path, int pathlen, __u64 *recno, int *linkno)
467 {
468         struct path_lookup_info *pli;
469         int tries = 3;
470         int rc = -EAGAIN;
471         ENTRY;
472
473         if (pathlen < 3)
474                 RETURN(-EOVERFLOW);
475
476         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
477                 path[0] = '\0';
478                 RETURN(0);
479         }
480
481         OBD_ALLOC_PTR(pli);
482         if (pli == NULL)
483                 RETURN(-ENOMEM);
484
485         pli->pli_mdd_obj = md2mdd_obj(obj);
486         pli->pli_recno = *recno;
487         pli->pli_path = path;
488         pli->pli_pathlen = pathlen;
489         pli->pli_linkno = *linkno;
490
491         /* Retry multiple times in case file is being moved */
492         while (tries-- && rc == -EAGAIN)
493                 rc = mdd_path_current(env, pli);
494
495         /* For historical path lookup, the current links may not have existed
496          * at "recno" time.  We must switch over to earlier links/parents
497          * by using the changelog records.  If the earlier parent doesn't
498          * exist, we must search back through the changelog to reconstruct
499          * its parents, then check if it exists, etc.
500          * We may ignore this problem for the initial implementation and
501          * state that an "original" hardlink must still exist for us to find
502          * historic path name. */
503         if (pli->pli_recno != -1) {
504                 rc = mdd_path_historic(env, pli);
505         } else {
506                 *recno = pli->pli_currec;
507                 /* Return next link index to caller */
508                 *linkno = pli->pli_linkno;
509         }
510
511         OBD_FREE_PTR(pli);
512
513         RETURN (rc);
514 }
515
516 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
517 {
518         struct lu_attr *la = &mdd_env_info(env)->mti_la;
519         int rc;
520
521         ENTRY;
522         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
523         if (rc == 0) {
524                 mdd_flags_xlate(obj, la->la_flags);
525         }
526         RETURN(rc);
527 }
528
529 /*
530  * No permission check is needed.
531  */
532 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
533                  struct md_attr *ma)
534 {
535         int rc;
536         ENTRY;
537
538         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
539                           mdd_object_capa(env, md2mdd_obj(obj)));
540         RETURN(rc);
541 }
542
543 /*
544  * No permission check is needed.
545  */
546 static int mdd_xattr_get(const struct lu_env *env,
547                          struct md_object *obj, struct lu_buf *buf,
548                          const char *name)
549 {
550         struct mdd_object *mdd_obj = md2mdd_obj(obj);
551         int rc;
552
553         ENTRY;
554
555         if (mdd_object_exists(mdd_obj) == 0) {
556                 CERROR("%s: object "DFID" not found: rc = -2\n",
557                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
558                 return -ENOENT;
559         }
560
561         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
562         rc = mdo_xattr_get(env, mdd_obj, buf, name,
563                            mdd_object_capa(env, mdd_obj));
564         mdd_read_unlock(env, mdd_obj);
565
566         RETURN(rc);
567 }
568
569 /*
570  * Permission check is done when open,
571  * no need check again.
572  */
573 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
574                         struct lu_buf *buf)
575 {
576         struct mdd_object *mdd_obj = md2mdd_obj(obj);
577         struct dt_object  *next;
578         loff_t             pos = 0;
579         int                rc;
580         ENTRY;
581
582         if (mdd_object_exists(mdd_obj) == 0) {
583                 CERROR("%s: object "DFID" not found: rc = -2\n",
584                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
585                 return -ENOENT;
586         }
587
588         next = mdd_object_child(mdd_obj);
589         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
590         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
591                                          mdd_object_capa(env, mdd_obj));
592         mdd_read_unlock(env, mdd_obj);
593         RETURN(rc);
594 }
595
596 /*
597  * No permission check is needed.
598  */
599 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
600                           struct lu_buf *buf)
601 {
602         struct mdd_object *mdd_obj = md2mdd_obj(obj);
603         int rc;
604
605         ENTRY;
606
607         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
608         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
609         mdd_read_unlock(env, mdd_obj);
610
611         RETURN(rc);
612 }
613
614 int mdd_declare_object_create_internal(const struct lu_env *env,
615                                        struct mdd_object *p,
616                                        struct mdd_object *c,
617                                        struct lu_attr *attr,
618                                        struct thandle *handle,
619                                        const struct md_op_spec *spec)
620 {
621         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
622         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
623         const struct dt_index_features *feat = spec->sp_feat;
624         int rc;
625         ENTRY;
626
627         if (feat != &dt_directory_features && feat != NULL) {
628                 dof->dof_type = DFT_INDEX;
629                 dof->u.dof_idx.di_feat = feat;
630
631         } else {
632                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
633                 if (dof->dof_type == DFT_REGULAR) {
634                         dof->u.dof_reg.striped =
635                                 md_should_create(spec->sp_cr_flags);
636                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
637                                 dof->u.dof_reg.striped = 0;
638                         /* is this replay? */
639                         if (spec->no_create)
640                                 dof->u.dof_reg.striped = 0;
641                 }
642         }
643
644         rc = mdo_declare_create_obj(env, c, attr, hint, dof, handle);
645
646         RETURN(rc);
647 }
648
649 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
650                                struct mdd_object *c, struct lu_attr *attr,
651                                struct thandle *handle,
652                                const struct md_op_spec *spec)
653 {
654         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
655         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
656         int rc;
657         ENTRY;
658
659         LASSERT(!mdd_object_exists(c));
660
661         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
662
663         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
664
665         RETURN(rc);
666 }
667
668 /**
669  * Make sure the ctime is increased only.
670  */
671 static inline int mdd_attr_check(const struct lu_env *env,
672                                  struct mdd_object *obj,
673                                  struct lu_attr *attr)
674 {
675         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
676         int rc;
677         ENTRY;
678
679         if (attr->la_valid & LA_CTIME) {
680                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
681                 if (rc)
682                         RETURN(rc);
683
684                 if (attr->la_ctime < tmp_la->la_ctime)
685                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
686                 else if (attr->la_valid == LA_CTIME &&
687                          attr->la_ctime == tmp_la->la_ctime)
688                         attr->la_valid &= ~LA_CTIME;
689         }
690         RETURN(0);
691 }
692
693 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
694                           struct lu_attr *attr, struct thandle *handle,
695                           int needacl)
696 {
697         int rc;
698         ENTRY;
699
700         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
701 #ifdef CONFIG_FS_POSIX_ACL
702         if (!rc && (attr->la_valid & LA_MODE) && needacl)
703                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
704 #endif
705         RETURN(rc);
706 }
707
708 int mdd_attr_check_set_internal(const struct lu_env *env,
709                                 struct mdd_object *obj, struct lu_attr *attr,
710                                 struct thandle *handle, int needacl)
711 {
712         int rc;
713         ENTRY;
714
715         rc = mdd_attr_check(env, obj, attr);
716         if (rc)
717                 RETURN(rc);
718
719         if (attr->la_valid)
720                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
721         RETURN(rc);
722 }
723
724 /*
725  * This gives the same functionality as the code between
726  * sys_chmod and inode_setattr
727  * chown_common and inode_setattr
728  * utimes and inode_setattr
729  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
730  */
731 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
732                         struct lu_attr *la, const unsigned long flags)
733 {
734         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
735         struct lu_ucred  *uc;
736         int               rc;
737         ENTRY;
738
739         if (!la->la_valid)
740                 RETURN(0);
741
742         /* Do not permit change file type */
743         if (la->la_valid & LA_TYPE)
744                 RETURN(-EPERM);
745
746         /* They should not be processed by setattr */
747         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
748                 RETURN(-EPERM);
749
750         /* export destroy does not have ->le_ses, but we may want
751          * to drop LUSTRE_SOM_FL. */
752         uc = lu_ucred_check(env);
753         if (uc == NULL)
754                 RETURN(0);
755
756         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
757         if (rc)
758                 RETURN(rc);
759
760         if (la->la_valid == LA_CTIME) {
761                 if (!(flags & MDS_PERM_BYPASS))
762                         /* This is only for set ctime when rename's source is
763                          * on remote MDS. */
764                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
765                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
766                         la->la_valid &= ~LA_CTIME;
767                 RETURN(rc);
768         }
769
770         if (la->la_valid == LA_ATIME) {
771                 /* This is atime only set for read atime update on close. */
772                 if (la->la_atime >= tmp_la->la_atime &&
773                     la->la_atime < (tmp_la->la_atime +
774                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
775                         la->la_valid &= ~LA_ATIME;
776                 RETURN(0);
777         }
778
779         /* Check if flags change. */
780         if (la->la_valid & LA_FLAGS) {
781                 unsigned int oldflags = 0;
782                 unsigned int newflags = la->la_flags &
783                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
784
785                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
786                     !md_capable(uc, CFS_CAP_FOWNER))
787                         RETURN(-EPERM);
788
789                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
790                  * only be changed by the relevant capability. */
791                 if (mdd_is_immutable(obj))
792                         oldflags |= LUSTRE_IMMUTABLE_FL;
793                 if (mdd_is_append(obj))
794                         oldflags |= LUSTRE_APPEND_FL;
795                 if ((oldflags ^ newflags) &&
796                     !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
797                         RETURN(-EPERM);
798
799                 if (!S_ISDIR(tmp_la->la_mode))
800                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
801         }
802
803         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
804             (la->la_valid & ~LA_FLAGS) &&
805             !(flags & MDS_PERM_BYPASS))
806                 RETURN(-EPERM);
807
808         /* Check for setting the obj time. */
809         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
810             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
811                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
812                     !md_capable(uc, CFS_CAP_FOWNER)) {
813                         rc = mdd_permission_internal(env, obj, tmp_la,
814                                                      MAY_WRITE);
815                         if (rc)
816                                 RETURN(rc);
817                 }
818         }
819
820         if (la->la_valid & LA_KILL_SUID) {
821                 la->la_valid &= ~LA_KILL_SUID;
822                 if ((tmp_la->la_mode & S_ISUID) &&
823                     !(la->la_valid & LA_MODE)) {
824                         la->la_mode = tmp_la->la_mode;
825                         la->la_valid |= LA_MODE;
826                 }
827                 la->la_mode &= ~S_ISUID;
828         }
829
830         if (la->la_valid & LA_KILL_SGID) {
831                 la->la_valid &= ~LA_KILL_SGID;
832                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
833                                         (S_ISGID | S_IXGRP)) &&
834                     !(la->la_valid & LA_MODE)) {
835                         la->la_mode = tmp_la->la_mode;
836                         la->la_valid |= LA_MODE;
837                 }
838                 la->la_mode &= ~S_ISGID;
839         }
840
841         /* Make sure a caller can chmod. */
842         if (la->la_valid & LA_MODE) {
843                 if (!(flags & MDS_PERM_BYPASS) &&
844                     (uc->uc_fsuid != tmp_la->la_uid) &&
845                     !md_capable(uc, CFS_CAP_FOWNER))
846                         RETURN(-EPERM);
847
848                 if (la->la_mode == (cfs_umode_t) -1)
849                         la->la_mode = tmp_la->la_mode;
850                 else
851                         la->la_mode = (la->la_mode & S_IALLUGO) |
852                                       (tmp_la->la_mode & ~S_IALLUGO);
853
854                 /* Also check the setgid bit! */
855                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
856                                        la->la_gid : tmp_la->la_gid) &&
857                     !md_capable(uc, CFS_CAP_FSETID))
858                         la->la_mode &= ~S_ISGID;
859         } else {
860                la->la_mode = tmp_la->la_mode;
861         }
862
863         /* Make sure a caller can chown. */
864         if (la->la_valid & LA_UID) {
865                 if (la->la_uid == (uid_t) -1)
866                         la->la_uid = tmp_la->la_uid;
867                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
868                      (la->la_uid != tmp_la->la_uid)) &&
869                     !md_capable(uc, CFS_CAP_CHOWN))
870                         RETURN(-EPERM);
871
872                 /* If the user or group of a non-directory has been
873                  * changed by a non-root user, remove the setuid bit.
874                  * 19981026 David C Niemi <niemi@tux.org>
875                  *
876                  * Changed this to apply to all users, including root,
877                  * to avoid some races. This is the behavior we had in
878                  * 2.0. The check for non-root was definitely wrong
879                  * for 2.2 anyway, as it should have been using
880                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
881                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
882                     !S_ISDIR(tmp_la->la_mode)) {
883                         la->la_mode &= ~S_ISUID;
884                         la->la_valid |= LA_MODE;
885                 }
886         }
887
888         /* Make sure caller can chgrp. */
889         if (la->la_valid & LA_GID) {
890                 if (la->la_gid == (gid_t) -1)
891                         la->la_gid = tmp_la->la_gid;
892                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
893                      ((la->la_gid != tmp_la->la_gid) &&
894                       !lustre_in_group_p(uc, la->la_gid))) &&
895                     !md_capable(uc, CFS_CAP_CHOWN))
896                         RETURN(-EPERM);
897
898                 /* Likewise, if the user or group of a non-directory
899                  * has been changed by a non-root user, remove the
900                  * setgid bit UNLESS there is no group execute bit
901                  * (this would be a file marked for mandatory
902                  * locking).  19981026 David C Niemi <niemi@tux.org>
903                  *
904                  * Removed the fsuid check (see the comment above) --
905                  * 19990830 SD. */
906                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
907                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
908                         la->la_mode &= ~S_ISGID;
909                         la->la_valid |= LA_MODE;
910                 }
911         }
912
913         /* For both Size-on-MDS case and truncate case,
914          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
915          * We distinguish them by "flags & MDS_SOM".
916          * For SOM case, it is true, the MAY_WRITE perm has been checked
917          * when open, no need check again. For truncate case, it is false,
918          * the MAY_WRITE perm should be checked here. */
919         if (flags & MDS_SOM) {
920                 /* For the "Size-on-MDS" setattr update, merge coming
921                  * attributes with the set in the inode. BUG 10641 */
922                 if ((la->la_valid & LA_ATIME) &&
923                     (la->la_atime <= tmp_la->la_atime))
924                         la->la_valid &= ~LA_ATIME;
925
926                 /* OST attributes do not have a priority over MDS attributes,
927                  * so drop times if ctime is equal. */
928                 if ((la->la_valid & LA_CTIME) &&
929                     (la->la_ctime <= tmp_la->la_ctime))
930                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
931         } else {
932                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
933                         if (!((flags & MDS_OWNEROVERRIDE) &&
934                               (uc->uc_fsuid == tmp_la->la_uid)) &&
935                             !(flags & MDS_PERM_BYPASS)) {
936                                 rc = mdd_permission_internal(env, obj,
937                                                              tmp_la, MAY_WRITE);
938                                 if (rc != 0)
939                                         RETURN(rc);
940                         }
941                 }
942                 if (la->la_valid & LA_CTIME) {
943                         /* The pure setattr, it has the priority over what is
944                          * already set, do not drop it if ctime is equal. */
945                         if (la->la_ctime < tmp_la->la_ctime)
946                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
947                                                   LA_CTIME);
948                 }
949         }
950
951         RETURN(0);
952 }
953
954 /** Store a data change changelog record
955  * If this fails, we must fail the whole transaction; we don't
956  * want the change to commit without the log entry.
957  * \param mdd_obj - mdd_object of change
958  * \param handle - transacion handle
959  */
960 static int mdd_changelog_data_store(const struct lu_env *env,
961                                     struct mdd_device *mdd,
962                                     enum changelog_rec_type type,
963                                     int flags, struct mdd_object *mdd_obj,
964                                     struct thandle *handle)
965 {
966         const struct lu_fid             *tfid;
967         struct llog_changelog_rec       *rec;
968         struct lu_buf                   *buf;
969         int                              reclen;
970         int                              rc;
971
972         /* Not recording */
973         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
974                 RETURN(0);
975         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
976                 RETURN(0);
977
978         LASSERT(mdd_obj != NULL);
979         LASSERT(handle != NULL);
980
981         tfid = mdo2fid(mdd_obj);
982
983         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
984             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
985                 /* Don't need multiple updates in this log */
986                 /* Don't check under lock - no big deal if we get an extra
987                    entry */
988                 RETURN(0);
989         }
990
991         reclen = llog_data_len(sizeof(*rec));
992         buf = mdd_buf_alloc(env, reclen);
993         if (buf->lb_buf == NULL)
994                 RETURN(-ENOMEM);
995         rec = buf->lb_buf;
996
997         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
998         rec->cr.cr_type = (__u32)type;
999         rec->cr.cr_tfid = *tfid;
1000         rec->cr.cr_namelen = 0;
1001         mdd_obj->mod_cltime = cfs_time_current_64();
1002
1003         rc = mdd_changelog_store(env, mdd, rec, handle);
1004
1005         RETURN(rc);
1006 }
1007
1008 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1009                   int flags, struct md_object *obj)
1010 {
1011         struct thandle *handle;
1012         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1013         struct mdd_device *mdd = mdo2mdd(obj);
1014         int rc;
1015         ENTRY;
1016
1017         handle = mdd_trans_create(env, mdd);
1018         if (IS_ERR(handle))
1019                 RETURN(PTR_ERR(handle));
1020
1021         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1022         if (rc)
1023                 GOTO(stop, rc);
1024
1025         rc = mdd_trans_start(env, mdd, handle);
1026         if (rc)
1027                 GOTO(stop, rc);
1028
1029         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1030                                       handle);
1031
1032 stop:
1033         mdd_trans_stop(env, mdd, rc, handle);
1034
1035         RETURN(rc);
1036 }
1037
1038 /**
1039  * Save LMA extended attributes with data from \a ma.
1040  *
1041  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1042  * not, LMA EA will be first read from disk, modified and write back.
1043  *
1044  */
1045 /* Precedence for choosing record type when multiple
1046  * attributes change: setattr > mtime > ctime > atime
1047  * (ctime changes when mtime does, plus chmod/chown.
1048  * atime and ctime are independent.) */
1049 static int mdd_attr_set_changelog(const struct lu_env *env,
1050                                   struct md_object *obj, struct thandle *handle,
1051                                   __u64 valid)
1052 {
1053         struct mdd_device *mdd = mdo2mdd(obj);
1054         int bits, type = 0;
1055
1056         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1057         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1058         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1059         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1060         bits = bits & mdd->mdd_cl.mc_mask;
1061         if (bits == 0)
1062                 return 0;
1063
1064         /* The record type is the lowest non-masked set bit */
1065         while (bits && ((bits & 1) == 0)) {
1066                 bits = bits >> 1;
1067                 type++;
1068         }
1069
1070         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1071         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1072                                         md2mdd_obj(obj), handle);
1073 }
1074
1075 static int mdd_declare_attr_set(const struct lu_env *env,
1076                                 struct mdd_device *mdd,
1077                                 struct mdd_object *obj,
1078                                 const struct lu_attr *attr,
1079                                 struct thandle *handle)
1080 {
1081         int rc;
1082
1083         rc = mdo_declare_attr_set(env, obj, attr, handle);
1084         if (rc)
1085                 return rc;
1086
1087 #ifdef CONFIG_FS_POSIX_ACL
1088         if (attr->la_valid & LA_MODE) {
1089                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1090                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1091                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1092                 mdd_read_unlock(env, obj);
1093                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1094                         rc = 0;
1095                 else if (rc < 0)
1096                         return rc;
1097
1098                 if (rc != 0) {
1099                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1100                         rc = mdo_declare_xattr_set(env, obj, buf,
1101                                                    XATTR_NAME_ACL_ACCESS, 0,
1102                                                    handle);
1103                         if (rc)
1104                                 return rc;
1105                 }
1106         }
1107 #endif
1108
1109         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1110         return rc;
1111 }
1112
1113 /* set attr and LOV EA at once, return updated attr */
1114 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1115                  const struct md_attr *ma)
1116 {
1117         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1118         struct mdd_device *mdd = mdo2mdd(obj);
1119         struct thandle *handle;
1120         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1121         const struct lu_attr *la = &ma->ma_attr;
1122         int rc;
1123         ENTRY;
1124
1125         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1126         LASSERT((ma->ma_valid & MA_LOV) == 0);
1127         LASSERT((ma->ma_valid & MA_HSM) == 0);
1128         LASSERT((ma->ma_valid & MA_SOM) == 0);
1129
1130         *la_copy = ma->ma_attr;
1131         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1132         if (rc)
1133                 RETURN(rc);
1134
1135         /* setattr on "close" only change atime, or do nothing */
1136         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1137                 RETURN(0);
1138
1139         handle = mdd_trans_create(env, mdd);
1140         if (IS_ERR(handle))
1141                 RETURN(PTR_ERR(handle));
1142
1143         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1144         if (rc)
1145                 GOTO(stop, rc);
1146
1147         rc = mdd_trans_start(env, mdd, handle);
1148         if (rc)
1149                 GOTO(stop, rc);
1150
1151         /* permission changes may require sync operation */
1152         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1153                 handle->th_sync |= !!mdd->mdd_sync_permission;
1154
1155         if (la->la_valid & (LA_MTIME | LA_CTIME))
1156                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1157                        la->la_mtime, la->la_ctime);
1158
1159         if (la_copy->la_valid & LA_FLAGS) {
1160                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1161                 if (rc == 0)
1162                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1163         } else if (la_copy->la_valid) {            /* setattr */
1164                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1165         }
1166
1167         if (rc == 0)
1168                 rc = mdd_attr_set_changelog(env, obj, handle,
1169                                             la->la_valid);
1170 stop:
1171         mdd_trans_stop(env, mdd, rc, handle);
1172         RETURN(rc);
1173 }
1174
1175 static int mdd_xattr_sanity_check(const struct lu_env *env,
1176                                   struct mdd_object *obj)
1177 {
1178         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1179         struct lu_ucred *uc     = lu_ucred_assert(env);
1180         int rc;
1181         ENTRY;
1182
1183         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1184                 RETURN(-EPERM);
1185
1186         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1187         if (rc)
1188                 RETURN(rc);
1189
1190         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1191             !md_capable(uc, CFS_CAP_FOWNER))
1192                 RETURN(-EPERM);
1193
1194         RETURN(rc);
1195 }
1196
1197 static int mdd_declare_xattr_set(const struct lu_env *env,
1198                                  struct mdd_device *mdd,
1199                                  struct mdd_object *obj,
1200                                  const struct lu_buf *buf,
1201                                  const char *name,
1202                                  int fl, struct thandle *handle)
1203 {
1204         int     rc;
1205
1206         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1207         if (rc)
1208                 return rc;
1209
1210         /* Only record user xattr changes */
1211         if ((strncmp("user.", name, 5) == 0)) {
1212                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1213                 if (rc)
1214                         return rc;
1215         }
1216
1217         /* If HSM data is modified, this could add a changelog */
1218         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
1219                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1220
1221         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1222         return rc;
1223 }
1224
1225 /*
1226  * Compare current and future data of HSM EA and add a changelog if needed.
1227  *
1228  * Caller should have write-locked \param obj.
1229  *
1230  * \param buf - Future HSM EA content.
1231  * \retval 0 if no changelog is needed or changelog was added properly.
1232  * \retval -ve errno if there was a problem
1233  */
1234 static int mdd_hsm_update_locked(const struct lu_env *env,
1235                                  struct md_object *obj,
1236                                  const struct lu_buf *buf,
1237                                  struct thandle *handle)
1238 {
1239         struct mdd_thread_info *info = mdd_env_info(env);
1240         struct mdd_device      *mdd = mdo2mdd(obj);
1241         struct mdd_object      *mdd_obj = md2mdd_obj(obj);
1242         struct lu_buf          *current_buf = &info->mti_buf;
1243         struct md_hsm          *current_mh;
1244         struct md_hsm          *new_mh;
1245         int                     rc;
1246         ENTRY;
1247
1248         OBD_ALLOC_PTR(current_mh);
1249         if (current_mh == NULL)
1250                 RETURN(-ENOMEM);
1251
1252         /* Read HSM attrs from disk */
1253         current_buf->lb_buf = info->mti_xattr_buf;
1254         current_buf->lb_len = sizeof(info->mti_xattr_buf);
1255         CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
1256         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
1257                            mdd_object_capa(env, mdd_obj));
1258         rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
1259         if (rc < 0 && rc != -ENODATA)
1260                 GOTO(free, rc);
1261         else if (rc == -ENODATA)
1262                 current_mh->mh_flags = 0;
1263
1264         /* Map future HSM xattr */
1265         OBD_ALLOC_PTR(new_mh);
1266         if (new_mh == NULL)
1267                 GOTO(free, rc = -ENOMEM);
1268         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1269
1270         /* If HSM flags are different, add a changelog */
1271         rc = 0;
1272         if (current_mh->mh_flags != new_mh->mh_flags) {
1273                 int flags = 0;
1274                 hsm_set_cl_event(&flags, HE_STATE);
1275                 if (new_mh->mh_flags & HS_DIRTY)
1276                         hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
1277
1278                 rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
1279                                               handle);
1280         }
1281
1282         OBD_FREE_PTR(new_mh);
1283 free:
1284         OBD_FREE_PTR(current_mh);
1285         return(rc);
1286 }
1287
1288
1289 /**
1290  * The caller should guarantee to update the object ctime
1291  * after xattr_set if needed.
1292  */
1293 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1294                          const struct lu_buf *buf, const char *name,
1295                          int fl)
1296 {
1297         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1298         struct mdd_device       *mdd = mdo2mdd(obj);
1299         struct thandle          *handle;
1300         int                      rc;
1301         ENTRY;
1302
1303         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1304                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1305                 RETURN(rc);
1306         }
1307
1308         rc = mdd_xattr_sanity_check(env, mdd_obj);
1309         if (rc)
1310                 RETURN(rc);
1311
1312         handle = mdd_trans_create(env, mdd);
1313         if (IS_ERR(handle))
1314                 RETURN(PTR_ERR(handle));
1315
1316         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
1317         if (rc)
1318                 GOTO(stop, rc);
1319
1320         rc = mdd_trans_start(env, mdd, handle);
1321         if (rc)
1322                 GOTO(stop, rc);
1323
1324         /* security-replated changes may require sync */
1325         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1326                 handle->th_sync |= !!mdd->mdd_sync_permission;
1327
1328         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1329
1330         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
1331                 rc = mdd_hsm_update_locked(env, obj, buf, handle);
1332                 if (rc) {
1333                         mdd_write_unlock(env, mdd_obj);
1334                         GOTO(stop, rc);
1335                 }
1336         }
1337
1338         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1339                            mdd_object_capa(env, mdd_obj));
1340         mdd_write_unlock(env, mdd_obj);
1341         if (rc)
1342                 GOTO(stop, rc);
1343
1344         /* Only record system & user xattr changes */
1345         if (strncmp(XATTR_USER_PREFIX, name,
1346                         sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1347             strncmp(POSIX_ACL_XATTR_ACCESS, name,
1348                         sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1349             strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1350                         sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1351                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1352                                               handle);
1353
1354 stop:
1355         mdd_trans_stop(env, mdd, rc, handle);
1356
1357         RETURN(rc);
1358 }
1359
1360 static int mdd_declare_xattr_del(const struct lu_env *env,
1361                                  struct mdd_device *mdd,
1362                                  struct mdd_object *obj,
1363                                  const char *name,
1364                                  struct thandle *handle)
1365 {
1366         int rc;
1367
1368         rc = mdo_declare_xattr_del(env, obj, name, handle);
1369         if (rc)
1370                 return rc;
1371
1372         /* Only record user xattr changes */
1373         if ((strncmp("user.", name, 5) == 0))
1374                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1375
1376         return rc;
1377 }
1378
1379 /**
1380  * The caller should guarantee to update the object ctime
1381  * after xattr_set if needed.
1382  */
1383 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1384                   const char *name)
1385 {
1386         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1387         struct mdd_device *mdd = mdo2mdd(obj);
1388         struct thandle *handle;
1389         int  rc;
1390         ENTRY;
1391
1392         rc = mdd_xattr_sanity_check(env, mdd_obj);
1393         if (rc)
1394                 RETURN(rc);
1395
1396         handle = mdd_trans_create(env, mdd);
1397         if (IS_ERR(handle))
1398                 RETURN(PTR_ERR(handle));
1399
1400         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1401         if (rc)
1402                 GOTO(stop, rc);
1403
1404         rc = mdd_trans_start(env, mdd, handle);
1405         if (rc)
1406                 GOTO(stop, rc);
1407
1408         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1409         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1410                            mdd_object_capa(env, mdd_obj));
1411         mdd_write_unlock(env, mdd_obj);
1412         if (rc)
1413                 GOTO(stop, rc);
1414
1415         /* Only record system & user xattr changes */
1416         if (strncmp(XATTR_USER_PREFIX, name,
1417                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1418                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1419                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1420                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1421                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1422                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1423                                               handle);
1424
1425 stop:
1426         mdd_trans_stop(env, mdd, rc, handle);
1427
1428         RETURN(rc);
1429 }
1430
1431 /*
1432  * read lov EA of an object
1433  * return the lov EA in an allocated lu_buf
1434  */
1435 static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
1436                                      struct mdd_object *obj)
1437 {
1438         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1439         struct lu_buf   *lmm_buf = NULL;
1440         int              rc, sz;
1441         ENTRY;
1442
1443 repeat:
1444         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
1445                            mdd_object_capa(env, obj));
1446         if (rc < 0)
1447                 GOTO(out, rc);
1448
1449         if (rc == 0)
1450                 GOTO(out, rc = -ENODATA);
1451
1452         sz = rc;
1453         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1454                 /* mti_big_buf was not allocated, so we have to
1455                  * allocate it based on the ea size */
1456                 buf = mdd_buf_alloc(env, sz);
1457                 if (buf->lb_buf == NULL)
1458                         GOTO(out, rc = -ENOMEM);
1459                 goto repeat;
1460         }
1461
1462         OBD_ALLOC_PTR(lmm_buf);
1463         if (!lmm_buf)
1464                 GOTO(out, rc = -ENOMEM);
1465
1466         OBD_ALLOC(lmm_buf->lb_buf, sz);
1467         if (!lmm_buf->lb_buf)
1468                 GOTO(free, rc = -ENOMEM);
1469
1470         memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
1471         lmm_buf->lb_len = sz;
1472
1473         GOTO(out, rc = 0);
1474
1475 free:
1476         if (lmm_buf)
1477                 OBD_FREE_PTR(lmm_buf);
1478 out:
1479         if (rc)
1480                 return ERR_PTR(rc);
1481         return lmm_buf;
1482 }
1483
1484
1485 /*
1486  *  check if layout swapping between 2 objects is allowed
1487  *  the rules are:
1488  *  - same type of objects
1489  *  - same owner/group (so quotas are still valid)
1490  */
1491 static int mdd_layout_swap_allowed(const struct lu_env *env,
1492                                    struct mdd_object *o1,
1493                                    struct mdd_object *o2)
1494 {
1495         const struct lu_fid     *fid1, *fid2;
1496         __u32                    uid, gid;
1497         struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
1498         int                      rc;
1499         ENTRY;
1500
1501         fid1 = mdo2fid(o1);
1502         fid2 = mdo2fid(o2);
1503
1504         if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
1505             (mdd_object_type(o1) != mdd_object_type(o2)))
1506                 RETURN(-EPERM);
1507
1508         tmp_la->la_valid = 0;
1509         rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
1510         if (rc)
1511                 RETURN(rc);
1512         uid = tmp_la->la_uid;
1513         gid = tmp_la->la_gid;
1514
1515         tmp_la->la_valid = 0;
1516         rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
1517         if (rc)
1518                 RETURN(rc);
1519
1520         if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
1521                 RETURN(-EPERM);
1522
1523         RETURN(0);
1524 }
1525
1526 /**
1527  * swap layouts between 2 lustre objects
1528  */
1529 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1530                             struct md_object *obj2, __u64 flags)
1531 {
1532         struct mdd_object       *o1, *o2, *fst_o, *snd_o;
1533         struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
1534         struct lu_buf           *fst_buf, *snd_buf;
1535         struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
1536         struct thandle          *handle;
1537         struct mdd_device       *mdd = mdo2mdd(obj1);
1538         int                      rc;
1539         __u16                    fst_gen, snd_gen;
1540         ENTRY;
1541
1542         /* we have to sort the 2 obj, so locking will always
1543          * be in the same order, even in case of 2 concurrent swaps */
1544         rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
1545                        mdo2fid(md2mdd_obj(obj2)));
1546         /* same fid ? */
1547         if (rc == 0)
1548                 RETURN(-EPERM);
1549
1550         if (rc > 0) {
1551                 o1 = md2mdd_obj(obj1);
1552                 o2 = md2mdd_obj(obj2);
1553         } else {
1554                 o1 = md2mdd_obj(obj2);
1555                 o2 = md2mdd_obj(obj1);
1556         }
1557
1558         /* check if layout swapping is allowed */
1559         rc = mdd_layout_swap_allowed(env, o1, o2);
1560         if (rc)
1561                 RETURN(rc);
1562
1563         handle = mdd_trans_create(env, mdd);
1564         if (IS_ERR(handle))
1565                 RETURN(PTR_ERR(handle));
1566
1567         /* objects are already sorted */
1568         mdd_write_lock(env, o1, MOR_TGT_CHILD);
1569         mdd_write_lock(env, o2, MOR_TGT_CHILD);
1570
1571         lmm1_buf = mdd_get_lov_ea(env, o1);
1572         if (IS_ERR(lmm1_buf)) {
1573                 rc = PTR_ERR(lmm1_buf);
1574                 lmm1_buf = NULL;
1575                 if (rc != -ENODATA)
1576                         GOTO(unlock, rc);
1577         }
1578
1579         lmm2_buf = mdd_get_lov_ea(env, o2);
1580         if (IS_ERR(lmm2_buf)) {
1581                 rc = PTR_ERR(lmm2_buf);
1582                 lmm2_buf = NULL;
1583                 if (rc != -ENODATA)
1584                         GOTO(unlock, rc);
1585         }
1586
1587         /* swapping 2 non existant layouts is a success */
1588         if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
1589                 GOTO(unlock, rc = 0);
1590
1591         /* to help inode migration between MDT, it is better to
1592          * start by the no layout file (if one), so we order the swap */
1593         if (lmm1_buf == NULL) {
1594                 fst_o = o1;
1595                 fst_buf = lmm1_buf;
1596                 snd_o = o2;
1597                 snd_buf = lmm2_buf;
1598         } else {
1599                 fst_o = o2;
1600                 fst_buf = lmm2_buf;
1601                 snd_o = o1;
1602                 snd_buf = lmm1_buf;
1603         }
1604
1605         /* lmm and generation layout initialization */
1606         if (fst_buf) {
1607                 fst_lmm = fst_buf->lb_buf;
1608                 fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
1609         } else {
1610                 fst_lmm = NULL;
1611                 fst_gen = 0;
1612         }
1613
1614         if (snd_buf) {
1615                 snd_lmm = snd_buf->lb_buf;
1616                 snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
1617         } else {
1618                 snd_lmm = NULL;
1619                 snd_gen = 0;
1620         }
1621
1622         /* save the orignal lmm common header of first file
1623          * to be able to roll back */
1624         OBD_ALLOC_PTR(old_fst_lmm);
1625         if (old_fst_lmm == NULL)
1626                 GOTO(unlock, rc = -ENOMEM);
1627
1628         memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
1629
1630         /* increase the generation layout numbers */
1631         snd_gen++;
1632         fst_gen++;
1633
1634         /* set the file specific informations in lmm */
1635         if (fst_lmm) {
1636                 fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
1637                 fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
1638                 fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
1639         }
1640
1641         if (snd_lmm) {
1642                 snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
1643                 snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1644                 snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1645         }
1646
1647         /* prepare transaction */
1648         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
1649                                    LU_XATTR_REPLACE, handle);
1650         if (rc)
1651                 GOTO(stop, rc);
1652
1653         rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
1654                                    LU_XATTR_REPLACE, handle);
1655         if (rc)
1656                 GOTO(stop, rc);
1657
1658         rc = mdd_trans_start(env, mdd, handle);
1659         if (rc)
1660                 GOTO(stop, rc);
1661
1662         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
1663                            LU_XATTR_REPLACE, handle,
1664                            mdd_object_capa(env, fst_o));
1665         if (rc)
1666                 GOTO(stop, rc);
1667
1668         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
1669                            LU_XATTR_REPLACE, handle,
1670                            mdd_object_capa(env, snd_o));
1671         if (rc) {
1672                 int     rc2;
1673
1674                 /* failure on second file, but first was done, so we have
1675                  * to roll back first */
1676                 /* restore object_id, object_seq and generation number
1677                  * on first file */
1678                 if (fst_lmm) {
1679                         fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1680                         fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1681                         fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
1682                 }
1683
1684                 rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
1685                                     LU_XATTR_REPLACE, handle,
1686                                     mdd_object_capa(env, fst_o));
1687                 if (rc2) {
1688                         /* very bad day */
1689                         CERROR("%s: unable to roll back after swap layouts"
1690                                " failure between "DFID" and "DFID
1691                                " rc2 = %d rc = %d)\n",
1692                                mdd2obd_dev(mdd)->obd_name,
1693                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
1694                                rc2, rc);
1695                         /* a solution to avoid journal commit is to panic,
1696                          * but it has strong consequences so we use LBUG to
1697                          * allow sysdamin to choose to panic or not
1698                          */
1699                         LBUG();
1700                 }
1701                 GOTO(stop, rc);
1702         }
1703         EXIT;
1704
1705 stop:
1706         mdd_trans_stop(env, mdd, rc, handle);
1707 unlock:
1708         mdd_write_unlock(env, o2);
1709         mdd_write_unlock(env, o1);
1710
1711         if (lmm1_buf && lmm1_buf->lb_buf)
1712                 OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
1713         if (lmm1_buf)
1714                 OBD_FREE_PTR(lmm1_buf);
1715
1716         if (lmm2_buf && lmm2_buf->lb_buf)
1717                 OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
1718         if (lmm2_buf)
1719                 OBD_FREE_PTR(lmm2_buf);
1720
1721         if (old_fst_lmm)
1722                 OBD_FREE_PTR(old_fst_lmm);
1723
1724         return rc;
1725 }
1726
1727 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1728                 struct mdd_object *child, struct lu_attr *attr)
1729 {
1730         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1731         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1732         struct dt_object *nc = mdd_object_child(child);
1733
1734         /* @hint will be initialized by underlying device. */
1735         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1736 }
1737
1738 /*
1739  * do NOT or the MAY_*'s, you'll get the weakest
1740  */
1741 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1742 {
1743         int res = 0;
1744
1745         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1746          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1747          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1748          * owner can write to a file even if it is marked readonly to hide
1749          * its brokenness. (bug 5781) */
1750         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1751                 struct lu_ucred *uc = lu_ucred_check(env);
1752
1753                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1754                         return 0;
1755         }
1756
1757         if (flags & FMODE_READ)
1758                 res |= MAY_READ;
1759         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1760                 res |= MAY_WRITE;
1761         if (flags & MDS_FMODE_EXEC)
1762                 res = MAY_EXEC;
1763         return res;
1764 }
1765
1766 static int mdd_open_sanity_check(const struct lu_env *env,
1767                                  struct mdd_object *obj, int flag)
1768 {
1769         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1770         int mode, rc;
1771         ENTRY;
1772
1773         /* EEXIST check */
1774         if (mdd_is_dead_obj(obj))
1775                 RETURN(-ENOENT);
1776
1777         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1778         if (rc)
1779                RETURN(rc);
1780
1781         if (S_ISLNK(tmp_la->la_mode))
1782                 RETURN(-ELOOP);
1783
1784         mode = accmode(env, tmp_la, flag);
1785
1786         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1787                 RETURN(-EISDIR);
1788
1789         if (!(flag & MDS_OPEN_CREATED)) {
1790                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1791                 if (rc)
1792                         RETURN(rc);
1793         }
1794
1795         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1796             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1797                 flag &= ~MDS_OPEN_TRUNC;
1798
1799         /* For writing append-only file must open it with append mode. */
1800         if (mdd_is_append(obj)) {
1801                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1802                         RETURN(-EPERM);
1803                 if (flag & MDS_OPEN_TRUNC)
1804                         RETURN(-EPERM);
1805         }
1806
1807 #if 0
1808         /*
1809          * Now, flag -- O_NOATIME does not be packed by client.
1810          */
1811         if (flag & O_NOATIME) {
1812                 struct lu_ucred *uc = lu_ucred(env);
1813
1814                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1815                            (uc->uc_valid == UCRED_NEW)) &&
1816                     (uc->uc_fsuid != tmp_la->la_uid) &&
1817                     !md_capable(uc, CFS_CAP_FOWNER))
1818                         RETURN(-EPERM);
1819         }
1820 #endif
1821
1822         RETURN(0);
1823 }
1824
1825 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1826                     int flags)
1827 {
1828         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1829         int rc = 0;
1830
1831         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1832
1833         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1834         if (rc == 0)
1835                 mdd_obj->mod_count++;
1836
1837         mdd_write_unlock(env, mdd_obj);
1838         return rc;
1839 }
1840
1841 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1842                             struct md_attr *ma, struct thandle *handle)
1843 {
1844         return mdo_declare_destroy(env, obj, handle);
1845 }
1846
1847 /* return md_attr back,
1848  * if it is last unlink then return lov ea + llog cookie*/
1849 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1850                     struct md_attr *ma, struct thandle *handle)
1851 {
1852         int rc;
1853         ENTRY;
1854
1855         rc = mdo_destroy(env, obj, handle);
1856
1857         RETURN(rc);
1858 }
1859
1860 static int mdd_declare_close(const struct lu_env *env,
1861                              struct mdd_object *obj,
1862                              struct md_attr *ma,
1863                              struct thandle *handle)
1864 {
1865         int rc;
1866
1867         rc = orph_declare_index_delete(env, obj, handle);
1868         if (rc)
1869                 return rc;
1870
1871         return mdo_declare_destroy(env, obj, handle);
1872 }
1873
1874 /*
1875  * No permission check is needed.
1876  */
1877 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1878                      struct md_attr *ma, int mode)
1879 {
1880         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1881         struct mdd_device *mdd = mdo2mdd(obj);
1882         struct thandle    *handle = NULL;
1883         int rc, is_orphan = 0;
1884         ENTRY;
1885
1886         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1887                 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1888                 mdd_obj->mod_count--;
1889                 mdd_write_unlock(env, mdd_obj);
1890
1891                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1892                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1893                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1894                 RETURN(0);
1895         }
1896
1897         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
1898          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
1899         /* check without any lock */
1900         is_orphan = mdd_obj->mod_count == 1 &&
1901                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
1902
1903 again:
1904         if (is_orphan) {
1905                 handle = mdd_trans_create(env, mdo2mdd(obj));
1906                 if (IS_ERR(handle))
1907                         RETURN(PTR_ERR(handle));
1908
1909                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1910                 if (rc)
1911                         GOTO(stop, rc);
1912
1913                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1914                 if (rc)
1915                         GOTO(stop, rc);
1916
1917                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1918                 if (rc)
1919                         GOTO(stop, rc);
1920         }
1921
1922         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1923         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1924                         mdd_object_capa(env, mdd_obj));
1925         if (rc != 0) {
1926                 CERROR("Failed to get lu_attr of "DFID": %d\n",
1927                        PFID(mdd_object_fid(mdd_obj)), rc);
1928                 GOTO(out, rc);
1929         }
1930
1931         /* check again with lock */
1932         is_orphan = (mdd_obj->mod_count == 1) &&
1933                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
1934                      ma->ma_attr.la_nlink == 0);
1935
1936         if (is_orphan && handle == NULL) {
1937                 mdd_write_unlock(env, mdd_obj);
1938                 goto again;
1939         }
1940
1941         mdd_obj->mod_count--; /*release open count */
1942
1943         if (!is_orphan)
1944                 GOTO(out, rc = 0);
1945
1946         /* Orphan object */
1947         /* NB: Object maybe not in orphan list originally, it is rare case for
1948          * mdd_finish_unlink() failure, in that case, the object doesn't have
1949          * ORPHAN_OBJ flag */
1950         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1951                 /* remove link to object from orphan index */
1952                 LASSERT(handle != NULL);
1953                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1954                 if (rc != 0) {
1955                         CERROR("%s: unable to delete "DFID" from orphan list: "
1956                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1957                                PFID(mdd_object_fid(mdd_obj)), rc);
1958                         /* If object was not deleted from orphan list, do not
1959                          * destroy OSS objects, which will be done when next
1960                          * recovery. */
1961                         GOTO(out, rc);
1962                 }
1963
1964                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1965                        "list, OSS objects to be destroyed.\n",
1966                        PFID(mdd_object_fid(mdd_obj)));
1967         }
1968
1969         rc = mdo_destroy(env, mdd_obj, handle);
1970
1971         if (rc != 0) {
1972                 CERROR("%s: unable to delete "DFID" from orphan list: "
1973                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1974                        PFID(mdd_object_fid(mdd_obj)), rc);
1975         }
1976         EXIT;
1977
1978 out:
1979         mdd_write_unlock(env, mdd_obj);
1980
1981         if (rc == 0 &&
1982             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1983             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1984                 if (handle == NULL) {
1985                         handle = mdd_trans_create(env, mdo2mdd(obj));
1986                         if (IS_ERR(handle))
1987                                 GOTO(stop, rc = IS_ERR(handle));
1988
1989                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1990                                                          handle);
1991                         if (rc)
1992                                 GOTO(stop, rc);
1993
1994                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1995                         if (rc)
1996                                 GOTO(stop, rc);
1997                 }
1998
1999                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2000                                          mdd_obj, handle);
2001         }
2002
2003 stop:
2004         if (handle != NULL)
2005                 mdd_trans_stop(env, mdd, rc, handle);
2006         return rc;
2007 }
2008
2009 /*
2010  * Permission check is done when open,
2011  * no need check again.
2012  */
2013 static int mdd_readpage_sanity_check(const struct lu_env *env,
2014                                      struct mdd_object *obj)
2015 {
2016         struct dt_object *next = mdd_object_child(obj);
2017         int rc;
2018         ENTRY;
2019
2020         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2021                 rc = 0;
2022         else
2023                 rc = -ENOTDIR;
2024
2025         RETURN(rc);
2026 }
2027
2028 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2029                               int nob, const struct dt_it_ops *iops,
2030                               struct dt_it *it, __u32 attr, void *arg)
2031 {
2032         struct lu_dirpage       *dp = &lp->lp_dir;
2033         void                    *area = dp;
2034         int                      result;
2035         __u64                    hash = 0;
2036         struct lu_dirent        *ent;
2037         struct lu_dirent        *last = NULL;
2038         int                      first = 1;
2039
2040         memset(area, 0, sizeof (*dp));
2041         area += sizeof (*dp);
2042         nob  -= sizeof (*dp);
2043
2044         ent  = area;
2045         do {
2046                 int    len;
2047                 int    recsize;
2048
2049                 len = iops->key_size(env, it);
2050
2051                 /* IAM iterator can return record with zero len. */
2052                 if (len == 0)
2053                         goto next;
2054
2055                 hash = iops->store(env, it);
2056                 if (unlikely(first)) {
2057                         first = 0;
2058                         dp->ldp_hash_start = cpu_to_le64(hash);
2059                 }
2060
2061                 /* calculate max space required for lu_dirent */
2062                 recsize = lu_dirent_calc_size(len, attr);
2063
2064                 if (nob >= recsize) {
2065                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2066                         if (result == -ESTALE)
2067                                 goto next;
2068                         if (result != 0)
2069                                 goto out;
2070
2071                         /* osd might not able to pack all attributes,
2072                          * so recheck rec length */
2073                         recsize = le16_to_cpu(ent->lde_reclen);
2074                 } else {
2075                         result = (last != NULL) ? 0 :-EINVAL;
2076                         goto out;
2077                 }
2078                 last = ent;
2079                 ent = (void *)ent + recsize;
2080                 nob -= recsize;
2081
2082 next:
2083                 result = iops->next(env, it);
2084                 if (result == -ESTALE)
2085                         goto next;
2086         } while (result == 0);
2087
2088 out:
2089         dp->ldp_hash_end = cpu_to_le64(hash);
2090         if (last != NULL) {
2091                 if (last->lde_hash == dp->ldp_hash_end)
2092                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2093                 last->lde_reclen = 0; /* end mark */
2094         }
2095         if (result > 0)
2096                 /* end of directory */
2097                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2098         else if (result < 0)
2099                 CWARN("build page failed: %d!\n", result);
2100         return result;
2101 }
2102
2103 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2104                  const struct lu_rdpg *rdpg)
2105 {
2106         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2107         int rc;
2108         ENTRY;
2109
2110         if (mdd_object_exists(mdd_obj) == 0) {
2111                 CERROR("%s: object "DFID" not found: rc = -2\n",
2112                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2113                 return -ENOENT;
2114         }
2115
2116         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2117         rc = mdd_readpage_sanity_check(env, mdd_obj);
2118         if (rc)
2119                 GOTO(out_unlock, rc);
2120
2121         if (mdd_is_dead_obj(mdd_obj)) {
2122                 struct page *pg;
2123                 struct lu_dirpage *dp;
2124
2125                 /*
2126                  * According to POSIX, please do not return any entry to client:
2127                  * even dot and dotdot should not be returned.
2128                  */
2129                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2130                        PFID(mdd_object_fid(mdd_obj)));
2131
2132                 if (rdpg->rp_count <= 0)
2133                         GOTO(out_unlock, rc = -EFAULT);
2134                 LASSERT(rdpg->rp_pages != NULL);
2135
2136                 pg = rdpg->rp_pages[0];
2137                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2138                 memset(dp, 0 , sizeof(struct lu_dirpage));
2139                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2140                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2141                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2142                 cfs_kunmap(pg);
2143                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2144         }
2145
2146         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2147                            mdd_dir_page_build, NULL);
2148         if (rc >= 0) {
2149                 struct lu_dirpage       *dp;
2150
2151                 dp = cfs_kmap(rdpg->rp_pages[0]);
2152                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2153                 if (rc == 0) {
2154                         /*
2155                          * No pages were processed, mark this for first page
2156                          * and send back.
2157                          */
2158                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2159                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2160                 }
2161                 cfs_kunmap(rdpg->rp_pages[0]);
2162         }
2163
2164         GOTO(out_unlock, rc);
2165 out_unlock:
2166         mdd_read_unlock(env, mdd_obj);
2167         return rc;
2168 }
2169
2170 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2171 {
2172         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2173
2174         if (mdd_object_exists(mdd_obj) == 0) {
2175                 CERROR("%s: object "DFID" not found: rc = -2\n",
2176                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2177                 return -ENOENT;
2178         }
2179         return dt_object_sync(env, mdd_object_child(mdd_obj));
2180 }
2181
2182 static int mdd_object_lock(const struct lu_env *env,
2183                            struct md_object *obj,
2184                            struct lustre_handle *lh,
2185                            struct ldlm_enqueue_info *einfo,
2186                            void *policy)
2187 {
2188         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2189         LASSERT(mdd_object_exists(mdd_obj));
2190         return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
2191                               einfo, policy);
2192 }
2193
2194 const struct md_object_operations mdd_obj_ops = {
2195         .moo_permission         = mdd_permission,
2196         .moo_attr_get           = mdd_attr_get,
2197         .moo_attr_set           = mdd_attr_set,
2198         .moo_xattr_get          = mdd_xattr_get,
2199         .moo_xattr_set          = mdd_xattr_set,
2200         .moo_xattr_list         = mdd_xattr_list,
2201         .moo_xattr_del          = mdd_xattr_del,
2202         .moo_swap_layouts       = mdd_swap_layouts,
2203         .moo_open               = mdd_open,
2204         .moo_close              = mdd_close,
2205         .moo_readpage           = mdd_readpage,
2206         .moo_readlink           = mdd_readlink,
2207         .moo_changelog          = mdd_changelog,
2208         .moo_capa_get           = mdd_capa_get,
2209         .moo_object_sync        = mdd_object_sync,
2210         .moo_path               = mdd_path,
2211         .moo_object_lock        = mdd_object_lock,
2212 };