Whamcloud - gitweb
58447993ae70cf2eeac04df6f88f1deec8124fdc
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 const struct lu_name *mdd_name_get_const(const struct lu_env *env,
111                                          const void *area, ssize_t len)
112 {
113         struct lu_name *lname;
114
115         lname = &mdd_env_info(env)->mti_name;
116         lname->ln_name = area;
117         lname->ln_namelen = len;
118         return lname;
119 }
120
121 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
122 {
123         struct lu_buf *buf;
124
125         buf = &mdd_env_info(env)->mti_buf;
126         buf->lb_buf = area;
127         buf->lb_len = len;
128         return buf;
129 }
130
131 void mdd_buf_put(struct lu_buf *buf)
132 {
133         if (buf == NULL || buf->lb_buf == NULL)
134                 return;
135         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
136         *buf = LU_BUF_NULL;
137 }
138
139 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
140                                        const void *area, ssize_t len)
141 {
142         struct lu_buf *buf;
143
144         buf = &mdd_env_info(env)->mti_buf;
145         buf->lb_buf = (void *)area;
146         buf->lb_len = len;
147         return buf;
148 }
149
150 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
151 {
152         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
153
154         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
155                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
156                 *buf = LU_BUF_NULL;
157         }
158         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
159                 buf->lb_len = len;
160                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
161                 if (buf->lb_buf == NULL)
162                         *buf = LU_BUF_NULL;
163         }
164         return buf;
165 }
166
167 /** Increase the size of the \a mti_big_buf.
168  * preserves old data in buffer
169  * old buffer remains unchanged on error
170  * \retval 0 or -ENOMEM
171  */
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
173 {
174         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175         struct lu_buf buf;
176
177         LASSERT(len >= oldbuf->lb_len);
178         OBD_ALLOC_LARGE(buf.lb_buf, len);
179
180         if (buf.lb_buf == NULL)
181                 return -ENOMEM;
182
183         buf.lb_len = len;
184         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
185
186         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
187
188         memcpy(oldbuf, &buf, sizeof(buf));
189
190         return 0;
191 }
192
193 struct lu_object *mdd_object_alloc(const struct lu_env *env,
194                                    const struct lu_object_header *hdr,
195                                    struct lu_device *d)
196 {
197         struct mdd_object *mdd_obj;
198
199         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
200         if (mdd_obj != NULL) {
201                 struct lu_object *o;
202
203                 o = mdd2lu_obj(mdd_obj);
204                 lu_object_init(o, NULL, d);
205                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
206                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
207                 mdd_obj->mod_count = 0;
208                 o->lo_ops = &mdd_lu_obj_ops;
209                 return o;
210         } else {
211                 return NULL;
212         }
213 }
214
215 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
216                            const struct lu_object_conf *unused)
217 {
218         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
219         struct mdd_object *mdd_obj = lu2mdd_obj(o);
220         struct lu_object  *below;
221         struct lu_device  *under;
222         ENTRY;
223
224         mdd_obj->mod_cltime = 0;
225         under = &d->mdd_child->dd_lu_dev;
226         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
227         mdd_pdlock_init(mdd_obj);
228         if (IS_ERR(below))
229                 RETURN(PTR_ERR(below));
230
231         lu_object_add(o, below);
232
233         RETURN(0);
234 }
235
236 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
237 {
238         if (lu_object_exists(o))
239                 return mdd_get_flags(env, lu2mdd_obj(o));
240         else
241                 return 0;
242 }
243
244 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
245 {
246         struct mdd_object *mdd = lu2mdd_obj(o);
247
248         lu_object_fini(o);
249         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
250 }
251
252 static int mdd_object_print(const struct lu_env *env, void *cookie,
253                             lu_printer_t p, const struct lu_object *o)
254 {
255         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
256         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
257                     "valid=%x, cltime="LPU64", flags=%lx)",
258                     mdd, mdd->mod_count, mdd->mod_valid,
259                     mdd->mod_cltime, mdd->mod_flags);
260 }
261
262 static const struct lu_object_operations mdd_lu_obj_ops = {
263         .loo_object_init    = mdd_object_init,
264         .loo_object_start   = mdd_object_start,
265         .loo_object_free    = mdd_object_free,
266         .loo_object_print   = mdd_object_print,
267 };
268
269 struct mdd_object *mdd_object_find(const struct lu_env *env,
270                                    struct mdd_device *d,
271                                    const struct lu_fid *f)
272 {
273         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
274 }
275
276 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
277                         const char *path, struct lu_fid *fid)
278 {
279         struct lu_buf *buf;
280         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
281         struct mdd_object *obj;
282         struct lu_name *lname = &mdd_env_info(env)->mti_name;
283         char *name;
284         int rc = 0;
285         ENTRY;
286
287         /* temp buffer for path element */
288         buf = mdd_buf_alloc(env, PATH_MAX);
289         if (buf->lb_buf == NULL)
290                 RETURN(-ENOMEM);
291
292         lname->ln_name = name = buf->lb_buf;
293         lname->ln_namelen = 0;
294         *f = mdd->mdd_root_fid;
295
296         while(1) {
297                 while (*path == '/')
298                         path++;
299                 if (*path == '\0')
300                         break;
301                 while (*path != '/' && *path != '\0') {
302                         *name = *path;
303                         path++;
304                         name++;
305                         lname->ln_namelen++;
306                 }
307
308                 *name = '\0';
309                 /* find obj corresponding to fid */
310                 obj = mdd_object_find(env, mdd, f);
311                 if (obj == NULL)
312                         GOTO(out, rc = -EREMOTE);
313                 if (IS_ERR(obj))
314                         GOTO(out, rc = PTR_ERR(obj));
315                 /* get child fid from parent and name */
316                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
317                 mdd_object_put(env, obj);
318                 if (rc)
319                         break;
320
321                 name = buf->lb_buf;
322                 lname->ln_namelen = 0;
323         }
324
325         if (!rc)
326                 *fid = *f;
327 out:
328         RETURN(rc);
329 }
330
331 /** The maximum depth that fid2path() will search.
332  * This is limited only because we want to store the fids for
333  * historical path lookup purposes.
334  */
335 #define MAX_PATH_DEPTH 100
336
337 /** mdd_path() lookup structure. */
338 struct path_lookup_info {
339         __u64                pli_recno;        /**< history point */
340         __u64                pli_currec;       /**< current record */
341         struct lu_fid        pli_fid;
342         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
343         struct mdd_object   *pli_mdd_obj;
344         char                *pli_path;         /**< full path */
345         int                  pli_pathlen;
346         int                  pli_linkno;       /**< which hardlink to follow */
347         int                  pli_fidcount;     /**< number of \a pli_fids */
348 };
349
350 static int mdd_path_current(const struct lu_env *env,
351                             struct path_lookup_info *pli)
352 {
353         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
354         struct mdd_object *mdd_obj;
355         struct lu_buf     *buf = NULL;
356         struct link_ea_header *leh;
357         struct link_ea_entry  *lee;
358         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
359         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
360         char *ptr;
361         int reclen;
362         int rc;
363         ENTRY;
364
365         ptr = pli->pli_path + pli->pli_pathlen - 1;
366         *ptr = 0;
367         --ptr;
368         pli->pli_fidcount = 0;
369         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
370
371         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
372                 mdd_obj = mdd_object_find(env, mdd,
373                                           &pli->pli_fids[pli->pli_fidcount]);
374                 if (mdd_obj == NULL)
375                         GOTO(out, rc = -EREMOTE);
376                 if (IS_ERR(mdd_obj))
377                         GOTO(out, rc = PTR_ERR(mdd_obj));
378                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
379                 if (rc <= 0) {
380                         mdd_object_put(env, mdd_obj);
381                         if (rc == -1)
382                                 rc = -EREMOTE;
383                         else if (rc == 0)
384                                 /* Do I need to error out here? */
385                                 rc = -ENOENT;
386                         GOTO(out, rc);
387                 }
388
389                 /* Get parent fid and object name */
390                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
391                 buf = mdd_links_get(env, mdd_obj);
392                 mdd_read_unlock(env, mdd_obj);
393                 mdd_object_put(env, mdd_obj);
394                 if (IS_ERR(buf))
395                         GOTO(out, rc = PTR_ERR(buf));
396
397                 leh = buf->lb_buf;
398                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
399                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
400
401                 /* If set, use link #linkno for path lookup, otherwise use
402                    link #0.  Only do this for the final path element. */
403                 if ((pli->pli_fidcount == 0) &&
404                     (pli->pli_linkno < leh->leh_reccount)) {
405                         int count;
406                         for (count = 0; count < pli->pli_linkno; count++) {
407                                 lee = (struct link_ea_entry *)
408                                      ((char *)lee + reclen);
409                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
410                         }
411                         if (pli->pli_linkno < leh->leh_reccount - 1)
412                                 /* indicate to user there are more links */
413                                 pli->pli_linkno++;
414                 }
415
416                 /* Pack the name in the end of the buffer */
417                 ptr -= tmpname->ln_namelen;
418                 if (ptr - 1 <= pli->pli_path)
419                         GOTO(out, rc = -EOVERFLOW);
420                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
421                 *(--ptr) = '/';
422
423                 /* Store the parent fid for historic lookup */
424                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
425                         GOTO(out, rc = -EOVERFLOW);
426                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
427         }
428
429         /* Verify that our path hasn't changed since we started the lookup.
430            Record the current index, and verify the path resolves to the
431            same fid. If it does, then the path is correct as of this index. */
432         spin_lock(&mdd->mdd_cl.mc_lock);
433         pli->pli_currec = mdd->mdd_cl.mc_index;
434         spin_unlock(&mdd->mdd_cl.mc_lock);
435         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
436         if (rc) {
437                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
438                 GOTO (out, rc = -EAGAIN);
439         }
440         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
441                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
442                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
443                        PFID(&pli->pli_fid));
444                 GOTO(out, rc = -EAGAIN);
445         }
446         ptr++; /* skip leading / */
447         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
448
449         EXIT;
450 out:
451         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
452                 /* if we vmalloced a large buffer drop it */
453                 mdd_buf_put(buf);
454
455         return rc;
456 }
457
458 static int mdd_path_historic(const struct lu_env *env,
459                              struct path_lookup_info *pli)
460 {
461         return 0;
462 }
463
464 /* Returns the full path to this fid, as of changelog record recno. */
465 static int mdd_path(const struct lu_env *env, struct md_object *obj,
466                     char *path, int pathlen, __u64 *recno, int *linkno)
467 {
468         struct path_lookup_info *pli;
469         int tries = 3;
470         int rc = -EAGAIN;
471         ENTRY;
472
473         if (pathlen < 3)
474                 RETURN(-EOVERFLOW);
475
476         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
477                 path[0] = '\0';
478                 RETURN(0);
479         }
480
481         OBD_ALLOC_PTR(pli);
482         if (pli == NULL)
483                 RETURN(-ENOMEM);
484
485         pli->pli_mdd_obj = md2mdd_obj(obj);
486         pli->pli_recno = *recno;
487         pli->pli_path = path;
488         pli->pli_pathlen = pathlen;
489         pli->pli_linkno = *linkno;
490
491         /* Retry multiple times in case file is being moved */
492         while (tries-- && rc == -EAGAIN)
493                 rc = mdd_path_current(env, pli);
494
495         /* For historical path lookup, the current links may not have existed
496          * at "recno" time.  We must switch over to earlier links/parents
497          * by using the changelog records.  If the earlier parent doesn't
498          * exist, we must search back through the changelog to reconstruct
499          * its parents, then check if it exists, etc.
500          * We may ignore this problem for the initial implementation and
501          * state that an "original" hardlink must still exist for us to find
502          * historic path name. */
503         if (pli->pli_recno != -1) {
504                 rc = mdd_path_historic(env, pli);
505         } else {
506                 *recno = pli->pli_currec;
507                 /* Return next link index to caller */
508                 *linkno = pli->pli_linkno;
509         }
510
511         OBD_FREE_PTR(pli);
512
513         RETURN (rc);
514 }
515
516 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
517 {
518         struct lu_attr *la = &mdd_env_info(env)->mti_la;
519         int rc;
520
521         ENTRY;
522         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
523         if (rc == 0) {
524                 mdd_flags_xlate(obj, la->la_flags);
525         }
526         RETURN(rc);
527 }
528
529 /*
530  * No permission check is needed.
531  */
532 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
533                  struct md_attr *ma)
534 {
535         struct mdd_object *mdd_obj = md2mdd_obj(obj);
536         int               rc;
537
538         ENTRY;
539
540         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
541                         mdd_object_capa(env, md2mdd_obj(obj)));
542         if ((ma->ma_need & MA_INODE) != 0 && mdd_is_dead_obj(mdd_obj))
543                 ma->ma_attr.la_nlink = 0;
544
545         RETURN(rc);
546 }
547
548 /*
549  * No permission check is needed.
550  */
551 static int mdd_xattr_get(const struct lu_env *env,
552                          struct md_object *obj, struct lu_buf *buf,
553                          const char *name)
554 {
555         struct mdd_object *mdd_obj = md2mdd_obj(obj);
556         int rc;
557
558         ENTRY;
559
560         if (mdd_object_exists(mdd_obj) == 0) {
561                 CERROR("%s: object "DFID" not found: rc = -2\n",
562                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
563                 return -ENOENT;
564         }
565
566         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
567         rc = mdo_xattr_get(env, mdd_obj, buf, name,
568                            mdd_object_capa(env, mdd_obj));
569         mdd_read_unlock(env, mdd_obj);
570
571         RETURN(rc);
572 }
573
574 /*
575  * Permission check is done when open,
576  * no need check again.
577  */
578 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
579                         struct lu_buf *buf)
580 {
581         struct mdd_object *mdd_obj = md2mdd_obj(obj);
582         struct dt_object  *next;
583         loff_t             pos = 0;
584         int                rc;
585         ENTRY;
586
587         if (mdd_object_exists(mdd_obj) == 0) {
588                 CERROR("%s: object "DFID" not found: rc = -2\n",
589                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
590                 return -ENOENT;
591         }
592
593         next = mdd_object_child(mdd_obj);
594         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
595         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
596                                          mdd_object_capa(env, mdd_obj));
597         mdd_read_unlock(env, mdd_obj);
598         RETURN(rc);
599 }
600
601 /*
602  * No permission check is needed.
603  */
604 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
605                           struct lu_buf *buf)
606 {
607         struct mdd_object *mdd_obj = md2mdd_obj(obj);
608         int rc;
609
610         ENTRY;
611
612         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
613         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
614         mdd_read_unlock(env, mdd_obj);
615
616         RETURN(rc);
617 }
618
619 int mdd_declare_object_create_internal(const struct lu_env *env,
620                                        struct mdd_object *p,
621                                        struct mdd_object *c,
622                                        struct lu_attr *attr,
623                                        struct thandle *handle,
624                                        const struct md_op_spec *spec)
625 {
626         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
627         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
628         const struct dt_index_features *feat = spec->sp_feat;
629         int rc;
630         ENTRY;
631
632         if (feat != &dt_directory_features && feat != NULL) {
633                 dof->dof_type = DFT_INDEX;
634                 dof->u.dof_idx.di_feat = feat;
635
636         } else {
637                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
638                 if (dof->dof_type == DFT_REGULAR) {
639                         dof->u.dof_reg.striped =
640                                 md_should_create(spec->sp_cr_flags);
641                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
642                                 dof->u.dof_reg.striped = 0;
643                         /* is this replay? */
644                         if (spec->no_create)
645                                 dof->u.dof_reg.striped = 0;
646                 }
647         }
648
649         rc = mdo_declare_create_obj(env, c, attr, hint, dof, handle);
650
651         RETURN(rc);
652 }
653
654 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
655                                struct mdd_object *c, struct lu_attr *attr,
656                                struct thandle *handle,
657                                const struct md_op_spec *spec)
658 {
659         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
660         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
661         int rc;
662         ENTRY;
663
664         LASSERT(!mdd_object_exists(c));
665
666         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
667
668         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
669
670         RETURN(rc);
671 }
672
673 /**
674  * Make sure the ctime is increased only.
675  */
676 static inline int mdd_attr_check(const struct lu_env *env,
677                                  struct mdd_object *obj,
678                                  struct lu_attr *attr)
679 {
680         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
681         int rc;
682         ENTRY;
683
684         if (attr->la_valid & LA_CTIME) {
685                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
686                 if (rc)
687                         RETURN(rc);
688
689                 if (attr->la_ctime < tmp_la->la_ctime)
690                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
691                 else if (attr->la_valid == LA_CTIME &&
692                          attr->la_ctime == tmp_la->la_ctime)
693                         attr->la_valid &= ~LA_CTIME;
694         }
695         RETURN(0);
696 }
697
698 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
699                           struct lu_attr *attr, struct thandle *handle,
700                           int needacl)
701 {
702         int rc;
703         ENTRY;
704
705         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
706 #ifdef CONFIG_FS_POSIX_ACL
707         if (!rc && (attr->la_valid & LA_MODE) && needacl)
708                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
709 #endif
710         RETURN(rc);
711 }
712
713 int mdd_attr_check_set_internal(const struct lu_env *env,
714                                 struct mdd_object *obj, struct lu_attr *attr,
715                                 struct thandle *handle, int needacl)
716 {
717         int rc;
718         ENTRY;
719
720         rc = mdd_attr_check(env, obj, attr);
721         if (rc)
722                 RETURN(rc);
723
724         if (attr->la_valid)
725                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
726         RETURN(rc);
727 }
728
729 /*
730  * This gives the same functionality as the code between
731  * sys_chmod and inode_setattr
732  * chown_common and inode_setattr
733  * utimes and inode_setattr
734  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
735  */
736 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
737                         struct lu_attr *la, const unsigned long flags)
738 {
739         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
740         struct lu_ucred  *uc;
741         int               rc;
742         ENTRY;
743
744         if (!la->la_valid)
745                 RETURN(0);
746
747         /* Do not permit change file type */
748         if (la->la_valid & LA_TYPE)
749                 RETURN(-EPERM);
750
751         /* They should not be processed by setattr */
752         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
753                 RETURN(-EPERM);
754
755         /* export destroy does not have ->le_ses, but we may want
756          * to drop LUSTRE_SOM_FL. */
757         uc = lu_ucred_check(env);
758         if (uc == NULL)
759                 RETURN(0);
760
761         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
762         if (rc)
763                 RETURN(rc);
764
765         if (la->la_valid == LA_CTIME) {
766                 if (!(flags & MDS_PERM_BYPASS))
767                         /* This is only for set ctime when rename's source is
768                          * on remote MDS. */
769                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
770                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
771                         la->la_valid &= ~LA_CTIME;
772                 RETURN(rc);
773         }
774
775         if (la->la_valid == LA_ATIME) {
776                 /* This is atime only set for read atime update on close. */
777                 if (la->la_atime >= tmp_la->la_atime &&
778                     la->la_atime < (tmp_la->la_atime +
779                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
780                         la->la_valid &= ~LA_ATIME;
781                 RETURN(0);
782         }
783
784         /* Check if flags change. */
785         if (la->la_valid & LA_FLAGS) {
786                 unsigned int oldflags = 0;
787                 unsigned int newflags = la->la_flags &
788                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
789
790                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
791                     !md_capable(uc, CFS_CAP_FOWNER))
792                         RETURN(-EPERM);
793
794                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
795                  * only be changed by the relevant capability. */
796                 if (mdd_is_immutable(obj))
797                         oldflags |= LUSTRE_IMMUTABLE_FL;
798                 if (mdd_is_append(obj))
799                         oldflags |= LUSTRE_APPEND_FL;
800                 if ((oldflags ^ newflags) &&
801                     !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
802                         RETURN(-EPERM);
803
804                 if (!S_ISDIR(tmp_la->la_mode))
805                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
806         }
807
808         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
809             (la->la_valid & ~LA_FLAGS) &&
810             !(flags & MDS_PERM_BYPASS))
811                 RETURN(-EPERM);
812
813         /* Check for setting the obj time. */
814         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
815             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
816                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
817                     !md_capable(uc, CFS_CAP_FOWNER)) {
818                         rc = mdd_permission_internal(env, obj, tmp_la,
819                                                      MAY_WRITE);
820                         if (rc)
821                                 RETURN(rc);
822                 }
823         }
824
825         if (la->la_valid & LA_KILL_SUID) {
826                 la->la_valid &= ~LA_KILL_SUID;
827                 if ((tmp_la->la_mode & S_ISUID) &&
828                     !(la->la_valid & LA_MODE)) {
829                         la->la_mode = tmp_la->la_mode;
830                         la->la_valid |= LA_MODE;
831                 }
832                 la->la_mode &= ~S_ISUID;
833         }
834
835         if (la->la_valid & LA_KILL_SGID) {
836                 la->la_valid &= ~LA_KILL_SGID;
837                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
838                                         (S_ISGID | S_IXGRP)) &&
839                     !(la->la_valid & LA_MODE)) {
840                         la->la_mode = tmp_la->la_mode;
841                         la->la_valid |= LA_MODE;
842                 }
843                 la->la_mode &= ~S_ISGID;
844         }
845
846         /* Make sure a caller can chmod. */
847         if (la->la_valid & LA_MODE) {
848                 if (!(flags & MDS_PERM_BYPASS) &&
849                     (uc->uc_fsuid != tmp_la->la_uid) &&
850                     !md_capable(uc, CFS_CAP_FOWNER))
851                         RETURN(-EPERM);
852
853                 if (la->la_mode == (cfs_umode_t) -1)
854                         la->la_mode = tmp_la->la_mode;
855                 else
856                         la->la_mode = (la->la_mode & S_IALLUGO) |
857                                       (tmp_la->la_mode & ~S_IALLUGO);
858
859                 /* Also check the setgid bit! */
860                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
861                                        la->la_gid : tmp_la->la_gid) &&
862                     !md_capable(uc, CFS_CAP_FSETID))
863                         la->la_mode &= ~S_ISGID;
864         } else {
865                la->la_mode = tmp_la->la_mode;
866         }
867
868         /* Make sure a caller can chown. */
869         if (la->la_valid & LA_UID) {
870                 if (la->la_uid == (uid_t) -1)
871                         la->la_uid = tmp_la->la_uid;
872                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
873                      (la->la_uid != tmp_la->la_uid)) &&
874                     !md_capable(uc, CFS_CAP_CHOWN))
875                         RETURN(-EPERM);
876
877                 /* If the user or group of a non-directory has been
878                  * changed by a non-root user, remove the setuid bit.
879                  * 19981026 David C Niemi <niemi@tux.org>
880                  *
881                  * Changed this to apply to all users, including root,
882                  * to avoid some races. This is the behavior we had in
883                  * 2.0. The check for non-root was definitely wrong
884                  * for 2.2 anyway, as it should have been using
885                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
886                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
887                     !S_ISDIR(tmp_la->la_mode)) {
888                         la->la_mode &= ~S_ISUID;
889                         la->la_valid |= LA_MODE;
890                 }
891         }
892
893         /* Make sure caller can chgrp. */
894         if (la->la_valid & LA_GID) {
895                 if (la->la_gid == (gid_t) -1)
896                         la->la_gid = tmp_la->la_gid;
897                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
898                      ((la->la_gid != tmp_la->la_gid) &&
899                       !lustre_in_group_p(uc, la->la_gid))) &&
900                     !md_capable(uc, CFS_CAP_CHOWN))
901                         RETURN(-EPERM);
902
903                 /* Likewise, if the user or group of a non-directory
904                  * has been changed by a non-root user, remove the
905                  * setgid bit UNLESS there is no group execute bit
906                  * (this would be a file marked for mandatory
907                  * locking).  19981026 David C Niemi <niemi@tux.org>
908                  *
909                  * Removed the fsuid check (see the comment above) --
910                  * 19990830 SD. */
911                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
912                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
913                         la->la_mode &= ~S_ISGID;
914                         la->la_valid |= LA_MODE;
915                 }
916         }
917
918         /* For both Size-on-MDS case and truncate case,
919          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
920          * We distinguish them by "flags & MDS_SOM".
921          * For SOM case, it is true, the MAY_WRITE perm has been checked
922          * when open, no need check again. For truncate case, it is false,
923          * the MAY_WRITE perm should be checked here. */
924         if (flags & MDS_SOM) {
925                 /* For the "Size-on-MDS" setattr update, merge coming
926                  * attributes with the set in the inode. BUG 10641 */
927                 if ((la->la_valid & LA_ATIME) &&
928                     (la->la_atime <= tmp_la->la_atime))
929                         la->la_valid &= ~LA_ATIME;
930
931                 /* OST attributes do not have a priority over MDS attributes,
932                  * so drop times if ctime is equal. */
933                 if ((la->la_valid & LA_CTIME) &&
934                     (la->la_ctime <= tmp_la->la_ctime))
935                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
936         } else {
937                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
938                         if (!((flags & MDS_OWNEROVERRIDE) &&
939                               (uc->uc_fsuid == tmp_la->la_uid)) &&
940                             !(flags & MDS_PERM_BYPASS)) {
941                                 rc = mdd_permission_internal(env, obj,
942                                                              tmp_la, MAY_WRITE);
943                                 if (rc != 0)
944                                         RETURN(rc);
945                         }
946                 }
947                 if (la->la_valid & LA_CTIME) {
948                         /* The pure setattr, it has the priority over what is
949                          * already set, do not drop it if ctime is equal. */
950                         if (la->la_ctime < tmp_la->la_ctime)
951                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
952                                                   LA_CTIME);
953                 }
954         }
955
956         RETURN(0);
957 }
958
959 /** Store a data change changelog record
960  * If this fails, we must fail the whole transaction; we don't
961  * want the change to commit without the log entry.
962  * \param mdd_obj - mdd_object of change
963  * \param handle - transacion handle
964  */
965 static int mdd_changelog_data_store(const struct lu_env *env,
966                                     struct mdd_device *mdd,
967                                     enum changelog_rec_type type,
968                                     int flags, struct mdd_object *mdd_obj,
969                                     struct thandle *handle)
970 {
971         const struct lu_fid             *tfid;
972         struct llog_changelog_rec       *rec;
973         struct lu_buf                   *buf;
974         int                              reclen;
975         int                              rc;
976
977         /* Not recording */
978         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
979                 RETURN(0);
980         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
981                 RETURN(0);
982
983         LASSERT(mdd_obj != NULL);
984         LASSERT(handle != NULL);
985
986         tfid = mdo2fid(mdd_obj);
987
988         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
989             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
990                 /* Don't need multiple updates in this log */
991                 /* Don't check under lock - no big deal if we get an extra
992                    entry */
993                 RETURN(0);
994         }
995
996         reclen = llog_data_len(sizeof(*rec));
997         buf = mdd_buf_alloc(env, reclen);
998         if (buf->lb_buf == NULL)
999                 RETURN(-ENOMEM);
1000         rec = buf->lb_buf;
1001
1002         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1003         rec->cr.cr_type = (__u32)type;
1004         rec->cr.cr_tfid = *tfid;
1005         rec->cr.cr_namelen = 0;
1006         mdd_obj->mod_cltime = cfs_time_current_64();
1007
1008         rc = mdd_changelog_store(env, mdd, rec, handle);
1009
1010         RETURN(rc);
1011 }
1012
1013 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1014                   int flags, struct md_object *obj)
1015 {
1016         struct thandle *handle;
1017         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1018         struct mdd_device *mdd = mdo2mdd(obj);
1019         int rc;
1020         ENTRY;
1021
1022         handle = mdd_trans_create(env, mdd);
1023         if (IS_ERR(handle))
1024                 RETURN(PTR_ERR(handle));
1025
1026         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1027         if (rc)
1028                 GOTO(stop, rc);
1029
1030         rc = mdd_trans_start(env, mdd, handle);
1031         if (rc)
1032                 GOTO(stop, rc);
1033
1034         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1035                                       handle);
1036
1037 stop:
1038         mdd_trans_stop(env, mdd, rc, handle);
1039
1040         RETURN(rc);
1041 }
1042
1043 /**
1044  * Save LMA extended attributes with data from \a ma.
1045  *
1046  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1047  * not, LMA EA will be first read from disk, modified and write back.
1048  *
1049  */
1050 /* Precedence for choosing record type when multiple
1051  * attributes change: setattr > mtime > ctime > atime
1052  * (ctime changes when mtime does, plus chmod/chown.
1053  * atime and ctime are independent.) */
1054 static int mdd_attr_set_changelog(const struct lu_env *env,
1055                                   struct md_object *obj, struct thandle *handle,
1056                                   __u64 valid)
1057 {
1058         struct mdd_device *mdd = mdo2mdd(obj);
1059         int bits, type = 0;
1060
1061         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1062         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1063         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1064         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1065         bits = bits & mdd->mdd_cl.mc_mask;
1066         if (bits == 0)
1067                 return 0;
1068
1069         /* The record type is the lowest non-masked set bit */
1070         while (bits && ((bits & 1) == 0)) {
1071                 bits = bits >> 1;
1072                 type++;
1073         }
1074
1075         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1076         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1077                                         md2mdd_obj(obj), handle);
1078 }
1079
1080 static int mdd_declare_attr_set(const struct lu_env *env,
1081                                 struct mdd_device *mdd,
1082                                 struct mdd_object *obj,
1083                                 const struct lu_attr *attr,
1084                                 struct thandle *handle)
1085 {
1086         int rc;
1087
1088         rc = mdo_declare_attr_set(env, obj, attr, handle);
1089         if (rc)
1090                 return rc;
1091
1092 #ifdef CONFIG_FS_POSIX_ACL
1093         if (attr->la_valid & LA_MODE) {
1094                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1095                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1096                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1097                 mdd_read_unlock(env, obj);
1098                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1099                         rc = 0;
1100                 else if (rc < 0)
1101                         return rc;
1102
1103                 if (rc != 0) {
1104                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1105                         rc = mdo_declare_xattr_set(env, obj, buf,
1106                                                    XATTR_NAME_ACL_ACCESS, 0,
1107                                                    handle);
1108                         if (rc)
1109                                 return rc;
1110                 }
1111         }
1112 #endif
1113
1114         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1115         return rc;
1116 }
1117
1118 /* set attr and LOV EA at once, return updated attr */
1119 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1120                  const struct md_attr *ma)
1121 {
1122         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1123         struct mdd_device *mdd = mdo2mdd(obj);
1124         struct thandle *handle;
1125         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1126         const struct lu_attr *la = &ma->ma_attr;
1127         int rc;
1128         ENTRY;
1129
1130         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1131         LASSERT((ma->ma_valid & MA_LOV) == 0);
1132         LASSERT((ma->ma_valid & MA_HSM) == 0);
1133         LASSERT((ma->ma_valid & MA_SOM) == 0);
1134
1135         *la_copy = ma->ma_attr;
1136         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1137         if (rc)
1138                 RETURN(rc);
1139
1140         /* setattr on "close" only change atime, or do nothing */
1141         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1142                 RETURN(0);
1143
1144         handle = mdd_trans_create(env, mdd);
1145         if (IS_ERR(handle))
1146                 RETURN(PTR_ERR(handle));
1147
1148         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1149         if (rc)
1150                 GOTO(stop, rc);
1151
1152         rc = mdd_trans_start(env, mdd, handle);
1153         if (rc)
1154                 GOTO(stop, rc);
1155
1156         /* permission changes may require sync operation */
1157         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1158                 handle->th_sync |= !!mdd->mdd_sync_permission;
1159
1160         if (la->la_valid & (LA_MTIME | LA_CTIME))
1161                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1162                        la->la_mtime, la->la_ctime);
1163
1164         if (la_copy->la_valid & LA_FLAGS) {
1165                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1166                 if (rc == 0)
1167                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1168         } else if (la_copy->la_valid) {            /* setattr */
1169                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1170         }
1171
1172         if (rc == 0)
1173                 rc = mdd_attr_set_changelog(env, obj, handle,
1174                                             la->la_valid);
1175 stop:
1176         mdd_trans_stop(env, mdd, rc, handle);
1177         RETURN(rc);
1178 }
1179
1180 static int mdd_xattr_sanity_check(const struct lu_env *env,
1181                                   struct mdd_object *obj)
1182 {
1183         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1184         struct lu_ucred *uc     = lu_ucred_assert(env);
1185         int rc;
1186         ENTRY;
1187
1188         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1189                 RETURN(-EPERM);
1190
1191         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1192         if (rc)
1193                 RETURN(rc);
1194
1195         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1196             !md_capable(uc, CFS_CAP_FOWNER))
1197                 RETURN(-EPERM);
1198
1199         RETURN(rc);
1200 }
1201
1202 static int mdd_declare_xattr_set(const struct lu_env *env,
1203                                  struct mdd_device *mdd,
1204                                  struct mdd_object *obj,
1205                                  const struct lu_buf *buf,
1206                                  const char *name,
1207                                  int fl, struct thandle *handle)
1208 {
1209         int     rc;
1210
1211         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1212         if (rc)
1213                 return rc;
1214
1215         /* Only record user xattr changes */
1216         if ((strncmp("user.", name, 5) == 0)) {
1217                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1218                 if (rc)
1219                         return rc;
1220         }
1221
1222         /* If HSM data is modified, this could add a changelog */
1223         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0)
1224                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1225
1226         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1227         return rc;
1228 }
1229
1230 /*
1231  * Compare current and future data of HSM EA and add a changelog if needed.
1232  *
1233  * Caller should have write-locked \param obj.
1234  *
1235  * \param buf - Future HSM EA content.
1236  * \retval 0 if no changelog is needed or changelog was added properly.
1237  * \retval -ve errno if there was a problem
1238  */
1239 static int mdd_hsm_update_locked(const struct lu_env *env,
1240                                  struct md_object *obj,
1241                                  const struct lu_buf *buf,
1242                                  struct thandle *handle)
1243 {
1244         struct mdd_thread_info *info = mdd_env_info(env);
1245         struct mdd_device      *mdd = mdo2mdd(obj);
1246         struct mdd_object      *mdd_obj = md2mdd_obj(obj);
1247         struct lu_buf          *current_buf = &info->mti_buf;
1248         struct md_hsm          *current_mh;
1249         struct md_hsm          *new_mh;
1250         int                     rc;
1251         ENTRY;
1252
1253         OBD_ALLOC_PTR(current_mh);
1254         if (current_mh == NULL)
1255                 RETURN(-ENOMEM);
1256
1257         /* Read HSM attrs from disk */
1258         current_buf->lb_buf = info->mti_xattr_buf;
1259         current_buf->lb_len = sizeof(info->mti_xattr_buf);
1260         CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
1261         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM,
1262                            mdd_object_capa(env, mdd_obj));
1263         rc = lustre_buf2hsm(info->mti_xattr_buf, rc, current_mh);
1264         if (rc < 0 && rc != -ENODATA)
1265                 GOTO(free, rc);
1266         else if (rc == -ENODATA)
1267                 current_mh->mh_flags = 0;
1268
1269         /* Map future HSM xattr */
1270         OBD_ALLOC_PTR(new_mh);
1271         if (new_mh == NULL)
1272                 GOTO(free, rc = -ENOMEM);
1273         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1274
1275         /* If HSM flags are different, add a changelog */
1276         rc = 0;
1277         if (current_mh->mh_flags != new_mh->mh_flags) {
1278                 int flags = 0;
1279                 hsm_set_cl_event(&flags, HE_STATE);
1280                 if (new_mh->mh_flags & HS_DIRTY)
1281                         hsm_set_cl_flags(&flags, CLF_HSM_DIRTY);
1282
1283                 rc = mdd_changelog_data_store(env, mdd, CL_HSM, flags, mdd_obj,
1284                                               handle);
1285         }
1286
1287         OBD_FREE_PTR(new_mh);
1288 free:
1289         OBD_FREE_PTR(current_mh);
1290         return(rc);
1291 }
1292
1293
1294 /**
1295  * The caller should guarantee to update the object ctime
1296  * after xattr_set if needed.
1297  */
1298 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1299                          const struct lu_buf *buf, const char *name,
1300                          int fl)
1301 {
1302         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1303         struct mdd_device       *mdd = mdo2mdd(obj);
1304         struct thandle          *handle;
1305         int                      rc;
1306         ENTRY;
1307
1308         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1309                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1310                 RETURN(rc);
1311         }
1312
1313         rc = mdd_xattr_sanity_check(env, mdd_obj);
1314         if (rc)
1315                 RETURN(rc);
1316
1317         handle = mdd_trans_create(env, mdd);
1318         if (IS_ERR(handle))
1319                 RETURN(PTR_ERR(handle));
1320
1321         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, 0, handle);
1322         if (rc)
1323                 GOTO(stop, rc);
1324
1325         rc = mdd_trans_start(env, mdd, handle);
1326         if (rc)
1327                 GOTO(stop, rc);
1328
1329         /* security-replated changes may require sync */
1330         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1331                 handle->th_sync |= !!mdd->mdd_sync_permission;
1332
1333         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1334
1335         if (strncmp(XATTR_NAME_HSM, name, sizeof(XATTR_NAME_HSM) - 1) == 0) {
1336                 rc = mdd_hsm_update_locked(env, obj, buf, handle);
1337                 if (rc) {
1338                         mdd_write_unlock(env, mdd_obj);
1339                         GOTO(stop, rc);
1340                 }
1341         }
1342
1343         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1344                            mdd_object_capa(env, mdd_obj));
1345         mdd_write_unlock(env, mdd_obj);
1346         if (rc)
1347                 GOTO(stop, rc);
1348
1349         /* Only record system & user xattr changes */
1350         if (strncmp(XATTR_USER_PREFIX, name,
1351                         sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1352             strncmp(POSIX_ACL_XATTR_ACCESS, name,
1353                         sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1354             strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1355                         sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1356                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1357                                               handle);
1358
1359 stop:
1360         mdd_trans_stop(env, mdd, rc, handle);
1361
1362         RETURN(rc);
1363 }
1364
1365 static int mdd_declare_xattr_del(const struct lu_env *env,
1366                                  struct mdd_device *mdd,
1367                                  struct mdd_object *obj,
1368                                  const char *name,
1369                                  struct thandle *handle)
1370 {
1371         int rc;
1372
1373         rc = mdo_declare_xattr_del(env, obj, name, handle);
1374         if (rc)
1375                 return rc;
1376
1377         /* Only record user xattr changes */
1378         if ((strncmp("user.", name, 5) == 0))
1379                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1380
1381         return rc;
1382 }
1383
1384 /**
1385  * The caller should guarantee to update the object ctime
1386  * after xattr_set if needed.
1387  */
1388 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1389                   const char *name)
1390 {
1391         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1392         struct mdd_device *mdd = mdo2mdd(obj);
1393         struct thandle *handle;
1394         int  rc;
1395         ENTRY;
1396
1397         rc = mdd_xattr_sanity_check(env, mdd_obj);
1398         if (rc)
1399                 RETURN(rc);
1400
1401         handle = mdd_trans_create(env, mdd);
1402         if (IS_ERR(handle))
1403                 RETURN(PTR_ERR(handle));
1404
1405         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1406         if (rc)
1407                 GOTO(stop, rc);
1408
1409         rc = mdd_trans_start(env, mdd, handle);
1410         if (rc)
1411                 GOTO(stop, rc);
1412
1413         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1414         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1415                            mdd_object_capa(env, mdd_obj));
1416         mdd_write_unlock(env, mdd_obj);
1417         if (rc)
1418                 GOTO(stop, rc);
1419
1420         /* Only record system & user xattr changes */
1421         if (strncmp(XATTR_USER_PREFIX, name,
1422                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1423                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1424                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1425                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1426                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1427                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1428                                               handle);
1429
1430 stop:
1431         mdd_trans_stop(env, mdd, rc, handle);
1432
1433         RETURN(rc);
1434 }
1435
1436 /*
1437  * read lov EA of an object
1438  * return the lov EA in an allocated lu_buf
1439  */
1440 static struct lu_buf *mdd_get_lov_ea(const struct lu_env *env,
1441                                      struct mdd_object *obj)
1442 {
1443         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1444         struct lu_buf   *lmm_buf = NULL;
1445         int              rc, sz;
1446         ENTRY;
1447
1448 repeat:
1449         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV,
1450                            mdd_object_capa(env, obj));
1451         if (rc < 0)
1452                 GOTO(out, rc);
1453
1454         if (rc == 0)
1455                 GOTO(out, rc = -ENODATA);
1456
1457         sz = rc;
1458         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1459                 /* mti_big_buf was not allocated, so we have to
1460                  * allocate it based on the ea size */
1461                 buf = mdd_buf_alloc(env, sz);
1462                 if (buf->lb_buf == NULL)
1463                         GOTO(out, rc = -ENOMEM);
1464                 goto repeat;
1465         }
1466
1467         OBD_ALLOC_PTR(lmm_buf);
1468         if (!lmm_buf)
1469                 GOTO(out, rc = -ENOMEM);
1470
1471         OBD_ALLOC(lmm_buf->lb_buf, sz);
1472         if (!lmm_buf->lb_buf)
1473                 GOTO(free, rc = -ENOMEM);
1474
1475         memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
1476         lmm_buf->lb_len = sz;
1477
1478         GOTO(out, rc = 0);
1479
1480 free:
1481         if (lmm_buf)
1482                 OBD_FREE_PTR(lmm_buf);
1483 out:
1484         if (rc)
1485                 return ERR_PTR(rc);
1486         return lmm_buf;
1487 }
1488
1489
1490 /*
1491  *  check if layout swapping between 2 objects is allowed
1492  *  the rules are:
1493  *  - same type of objects
1494  *  - same owner/group (so quotas are still valid)
1495  */
1496 static int mdd_layout_swap_allowed(const struct lu_env *env,
1497                                    struct mdd_object *o1,
1498                                    struct mdd_object *o2)
1499 {
1500         const struct lu_fid     *fid1, *fid2;
1501         __u32                    uid, gid;
1502         struct lu_attr          *tmp_la = &mdd_env_info(env)->mti_la;
1503         int                      rc;
1504         ENTRY;
1505
1506         fid1 = mdo2fid(o1);
1507         fid2 = mdo2fid(o2);
1508
1509         if (!fid_is_norm(fid1) || !fid_is_norm(fid2) ||
1510             (mdd_object_type(o1) != mdd_object_type(o2)))
1511                 RETURN(-EPERM);
1512
1513         tmp_la->la_valid = 0;
1514         rc = mdd_la_get(env, o1, tmp_la, BYPASS_CAPA);
1515         if (rc)
1516                 RETURN(rc);
1517         uid = tmp_la->la_uid;
1518         gid = tmp_la->la_gid;
1519
1520         tmp_la->la_valid = 0;
1521         rc = mdd_la_get(env, o2, tmp_la, BYPASS_CAPA);
1522         if (rc)
1523                 RETURN(rc);
1524
1525         if ((uid != tmp_la->la_uid) || (gid != tmp_la->la_gid))
1526                 RETURN(-EPERM);
1527
1528         RETURN(0);
1529 }
1530
1531 /**
1532  * swap layouts between 2 lustre objects
1533  */
1534 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1535                             struct md_object *obj2, __u64 flags)
1536 {
1537         struct mdd_object       *o1, *o2, *fst_o, *snd_o;
1538         struct lu_buf           *lmm1_buf = NULL, *lmm2_buf = NULL;
1539         struct lu_buf           *fst_buf, *snd_buf;
1540         struct lov_mds_md       *fst_lmm, *snd_lmm, *old_fst_lmm = NULL;
1541         struct thandle          *handle;
1542         struct mdd_device       *mdd = mdo2mdd(obj1);
1543         int                      rc;
1544         __u16                    fst_gen, snd_gen;
1545         ENTRY;
1546
1547         /* we have to sort the 2 obj, so locking will always
1548          * be in the same order, even in case of 2 concurrent swaps */
1549         rc = lu_fid_cmp(mdo2fid(md2mdd_obj(obj1)),
1550                        mdo2fid(md2mdd_obj(obj2)));
1551         /* same fid ? */
1552         if (rc == 0)
1553                 RETURN(-EPERM);
1554
1555         if (rc > 0) {
1556                 o1 = md2mdd_obj(obj1);
1557                 o2 = md2mdd_obj(obj2);
1558         } else {
1559                 o1 = md2mdd_obj(obj2);
1560                 o2 = md2mdd_obj(obj1);
1561         }
1562
1563         /* check if layout swapping is allowed */
1564         rc = mdd_layout_swap_allowed(env, o1, o2);
1565         if (rc)
1566                 RETURN(rc);
1567
1568         handle = mdd_trans_create(env, mdd);
1569         if (IS_ERR(handle))
1570                 RETURN(PTR_ERR(handle));
1571
1572         /* objects are already sorted */
1573         mdd_write_lock(env, o1, MOR_TGT_CHILD);
1574         mdd_write_lock(env, o2, MOR_TGT_CHILD);
1575
1576         lmm1_buf = mdd_get_lov_ea(env, o1);
1577         if (IS_ERR(lmm1_buf)) {
1578                 rc = PTR_ERR(lmm1_buf);
1579                 lmm1_buf = NULL;
1580                 if (rc != -ENODATA)
1581                         GOTO(unlock, rc);
1582         }
1583
1584         lmm2_buf = mdd_get_lov_ea(env, o2);
1585         if (IS_ERR(lmm2_buf)) {
1586                 rc = PTR_ERR(lmm2_buf);
1587                 lmm2_buf = NULL;
1588                 if (rc != -ENODATA)
1589                         GOTO(unlock, rc);
1590         }
1591
1592         /* swapping 2 non existant layouts is a success */
1593         if ((lmm1_buf == NULL) && (lmm2_buf == NULL))
1594                 GOTO(unlock, rc = 0);
1595
1596         /* to help inode migration between MDT, it is better to
1597          * start by the no layout file (if one), so we order the swap */
1598         if (lmm1_buf == NULL) {
1599                 fst_o = o1;
1600                 fst_buf = lmm1_buf;
1601                 snd_o = o2;
1602                 snd_buf = lmm2_buf;
1603         } else {
1604                 fst_o = o2;
1605                 fst_buf = lmm2_buf;
1606                 snd_o = o1;
1607                 snd_buf = lmm1_buf;
1608         }
1609
1610         /* lmm and generation layout initialization */
1611         if (fst_buf) {
1612                 fst_lmm = fst_buf->lb_buf;
1613                 fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen);
1614         } else {
1615                 fst_lmm = NULL;
1616                 fst_gen = 0;
1617         }
1618
1619         if (snd_buf) {
1620                 snd_lmm = snd_buf->lb_buf;
1621                 snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen);
1622         } else {
1623                 snd_lmm = NULL;
1624                 snd_gen = 0;
1625         }
1626
1627         /* save the orignal lmm common header of first file
1628          * to be able to roll back */
1629         OBD_ALLOC_PTR(old_fst_lmm);
1630         if (old_fst_lmm == NULL)
1631                 GOTO(unlock, rc = -ENOMEM);
1632
1633         memcpy(old_fst_lmm, fst_lmm, sizeof(*old_fst_lmm));
1634
1635         /* increase the generation layout numbers */
1636         snd_gen++;
1637         fst_gen++;
1638
1639         /* set the file specific informations in lmm */
1640         if (fst_lmm) {
1641                 fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen);
1642                 fst_lmm->lmm_object_seq = snd_lmm->lmm_object_seq;
1643                 fst_lmm->lmm_object_id = snd_lmm->lmm_object_id;
1644         }
1645
1646         if (snd_lmm) {
1647                 snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen);
1648                 snd_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1649                 snd_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1650         }
1651
1652         /* prepare transaction */
1653         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
1654                                    LU_XATTR_REPLACE, handle);
1655         if (rc)
1656                 GOTO(stop, rc);
1657
1658         rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf, XATTR_NAME_LOV,
1659                                    LU_XATTR_REPLACE, handle);
1660         if (rc)
1661                 GOTO(stop, rc);
1662
1663         rc = mdd_trans_start(env, mdd, handle);
1664         if (rc)
1665                 GOTO(stop, rc);
1666
1667         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV,
1668                            LU_XATTR_REPLACE, handle,
1669                            mdd_object_capa(env, fst_o));
1670         if (rc)
1671                 GOTO(stop, rc);
1672
1673         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
1674                            LU_XATTR_REPLACE, handle,
1675                            mdd_object_capa(env, snd_o));
1676         if (rc) {
1677                 int     rc2;
1678
1679                 /* failure on second file, but first was done, so we have
1680                  * to roll back first */
1681                 /* restore object_id, object_seq and generation number
1682                  * on first file */
1683                 if (fst_lmm) {
1684                         fst_lmm->lmm_object_id = old_fst_lmm->lmm_object_id;
1685                         fst_lmm->lmm_object_seq = old_fst_lmm->lmm_object_seq;
1686                         fst_lmm->lmm_layout_gen = old_fst_lmm->lmm_layout_gen;
1687                 }
1688
1689                 rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
1690                                     LU_XATTR_REPLACE, handle,
1691                                     mdd_object_capa(env, fst_o));
1692                 if (rc2) {
1693                         /* very bad day */
1694                         CERROR("%s: unable to roll back after swap layouts"
1695                                " failure between "DFID" and "DFID
1696                                " rc2 = %d rc = %d)\n",
1697                                mdd2obd_dev(mdd)->obd_name,
1698                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
1699                                rc2, rc);
1700                         /* a solution to avoid journal commit is to panic,
1701                          * but it has strong consequences so we use LBUG to
1702                          * allow sysdamin to choose to panic or not
1703                          */
1704                         LBUG();
1705                 }
1706                 GOTO(stop, rc);
1707         }
1708         EXIT;
1709
1710 stop:
1711         mdd_trans_stop(env, mdd, rc, handle);
1712 unlock:
1713         mdd_write_unlock(env, o2);
1714         mdd_write_unlock(env, o1);
1715
1716         if (lmm1_buf && lmm1_buf->lb_buf)
1717                 OBD_FREE(lmm1_buf->lb_buf, lmm1_buf->lb_len);
1718         if (lmm1_buf)
1719                 OBD_FREE_PTR(lmm1_buf);
1720
1721         if (lmm2_buf && lmm2_buf->lb_buf)
1722                 OBD_FREE(lmm2_buf->lb_buf, lmm2_buf->lb_len);
1723         if (lmm2_buf)
1724                 OBD_FREE_PTR(lmm2_buf);
1725
1726         if (old_fst_lmm)
1727                 OBD_FREE_PTR(old_fst_lmm);
1728
1729         return rc;
1730 }
1731
1732 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1733                 struct mdd_object *child, struct lu_attr *attr)
1734 {
1735         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1736         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1737         struct dt_object *nc = mdd_object_child(child);
1738
1739         /* @hint will be initialized by underlying device. */
1740         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1741 }
1742
1743 /*
1744  * do NOT or the MAY_*'s, you'll get the weakest
1745  */
1746 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1747 {
1748         int res = 0;
1749
1750         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1751          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1752          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1753          * owner can write to a file even if it is marked readonly to hide
1754          * its brokenness. (bug 5781) */
1755         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1756                 struct lu_ucred *uc = lu_ucred_check(env);
1757
1758                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1759                         return 0;
1760         }
1761
1762         if (flags & FMODE_READ)
1763                 res |= MAY_READ;
1764         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1765                 res |= MAY_WRITE;
1766         if (flags & MDS_FMODE_EXEC)
1767                 res = MAY_EXEC;
1768         return res;
1769 }
1770
1771 static int mdd_open_sanity_check(const struct lu_env *env,
1772                                  struct mdd_object *obj, int flag)
1773 {
1774         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1775         int mode, rc;
1776         ENTRY;
1777
1778         /* EEXIST check */
1779         if (mdd_is_dead_obj(obj))
1780                 RETURN(-ENOENT);
1781
1782         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1783         if (rc)
1784                RETURN(rc);
1785
1786         if (S_ISLNK(tmp_la->la_mode))
1787                 RETURN(-ELOOP);
1788
1789         mode = accmode(env, tmp_la, flag);
1790
1791         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1792                 RETURN(-EISDIR);
1793
1794         if (!(flag & MDS_OPEN_CREATED)) {
1795                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1796                 if (rc)
1797                         RETURN(rc);
1798         }
1799
1800         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1801             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1802                 flag &= ~MDS_OPEN_TRUNC;
1803
1804         /* For writing append-only file must open it with append mode. */
1805         if (mdd_is_append(obj)) {
1806                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1807                         RETURN(-EPERM);
1808                 if (flag & MDS_OPEN_TRUNC)
1809                         RETURN(-EPERM);
1810         }
1811
1812 #if 0
1813         /*
1814          * Now, flag -- O_NOATIME does not be packed by client.
1815          */
1816         if (flag & O_NOATIME) {
1817                 struct lu_ucred *uc = lu_ucred(env);
1818
1819                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1820                            (uc->uc_valid == UCRED_NEW)) &&
1821                     (uc->uc_fsuid != tmp_la->la_uid) &&
1822                     !md_capable(uc, CFS_CAP_FOWNER))
1823                         RETURN(-EPERM);
1824         }
1825 #endif
1826
1827         RETURN(0);
1828 }
1829
1830 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1831                     int flags)
1832 {
1833         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1834         int rc = 0;
1835
1836         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1837
1838         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1839         if (rc == 0)
1840                 mdd_obj->mod_count++;
1841
1842         mdd_write_unlock(env, mdd_obj);
1843         return rc;
1844 }
1845
1846 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1847                             struct md_attr *ma, struct thandle *handle)
1848 {
1849         return mdo_declare_destroy(env, obj, handle);
1850 }
1851
1852 /* return md_attr back,
1853  * if it is last unlink then return lov ea + llog cookie*/
1854 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1855                     struct md_attr *ma, struct thandle *handle)
1856 {
1857         int rc;
1858         ENTRY;
1859
1860         rc = mdo_destroy(env, obj, handle);
1861
1862         RETURN(rc);
1863 }
1864
1865 static int mdd_declare_close(const struct lu_env *env,
1866                              struct mdd_object *obj,
1867                              struct md_attr *ma,
1868                              struct thandle *handle)
1869 {
1870         int rc;
1871
1872         rc = orph_declare_index_delete(env, obj, handle);
1873         if (rc)
1874                 return rc;
1875
1876         return mdo_declare_destroy(env, obj, handle);
1877 }
1878
1879 /*
1880  * No permission check is needed.
1881  */
1882 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1883                      struct md_attr *ma, int mode)
1884 {
1885         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1886         struct mdd_device *mdd = mdo2mdd(obj);
1887         struct thandle    *handle = NULL;
1888         int rc, is_orphan = 0;
1889         ENTRY;
1890
1891         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1892                 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1893                 mdd_obj->mod_count--;
1894                 mdd_write_unlock(env, mdd_obj);
1895
1896                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1897                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1898                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1899                 RETURN(0);
1900         }
1901
1902         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
1903          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
1904         /* check without any lock */
1905         is_orphan = mdd_obj->mod_count == 1 &&
1906                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
1907
1908 again:
1909         if (is_orphan) {
1910                 handle = mdd_trans_create(env, mdo2mdd(obj));
1911                 if (IS_ERR(handle))
1912                         RETURN(PTR_ERR(handle));
1913
1914                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1915                 if (rc)
1916                         GOTO(stop, rc);
1917
1918                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1919                 if (rc)
1920                         GOTO(stop, rc);
1921
1922                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1923                 if (rc)
1924                         GOTO(stop, rc);
1925         }
1926
1927         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1928         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1929                         mdd_object_capa(env, mdd_obj));
1930         if (rc != 0) {
1931                 CERROR("Failed to get lu_attr of "DFID": %d\n",
1932                        PFID(mdd_object_fid(mdd_obj)), rc);
1933                 GOTO(out, rc);
1934         }
1935
1936         /* check again with lock */
1937         is_orphan = (mdd_obj->mod_count == 1) &&
1938                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
1939                      ma->ma_attr.la_nlink == 0);
1940
1941         if (is_orphan && handle == NULL) {
1942                 mdd_write_unlock(env, mdd_obj);
1943                 goto again;
1944         }
1945
1946         mdd_obj->mod_count--; /*release open count */
1947
1948         if (!is_orphan)
1949                 GOTO(out, rc = 0);
1950
1951         /* Orphan object */
1952         /* NB: Object maybe not in orphan list originally, it is rare case for
1953          * mdd_finish_unlink() failure, in that case, the object doesn't have
1954          * ORPHAN_OBJ flag */
1955         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1956                 /* remove link to object from orphan index */
1957                 LASSERT(handle != NULL);
1958                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1959                 if (rc != 0) {
1960                         CERROR("%s: unable to delete "DFID" from orphan list: "
1961                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1962                                PFID(mdd_object_fid(mdd_obj)), rc);
1963                         /* If object was not deleted from orphan list, do not
1964                          * destroy OSS objects, which will be done when next
1965                          * recovery. */
1966                         GOTO(out, rc);
1967                 }
1968
1969                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1970                        "list, OSS objects to be destroyed.\n",
1971                        PFID(mdd_object_fid(mdd_obj)));
1972         }
1973
1974         rc = mdo_destroy(env, mdd_obj, handle);
1975
1976         if (rc != 0) {
1977                 CERROR("%s: unable to delete "DFID" from orphan list: "
1978                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
1979                        PFID(mdd_object_fid(mdd_obj)), rc);
1980         }
1981         EXIT;
1982
1983 out:
1984         mdd_write_unlock(env, mdd_obj);
1985
1986         if (rc == 0 &&
1987             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1988             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1989                 if (handle == NULL) {
1990                         handle = mdd_trans_create(env, mdo2mdd(obj));
1991                         if (IS_ERR(handle))
1992                                 GOTO(stop, rc = IS_ERR(handle));
1993
1994                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1995                                                          handle);
1996                         if (rc)
1997                                 GOTO(stop, rc);
1998
1999                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2000                         if (rc)
2001                                 GOTO(stop, rc);
2002                 }
2003
2004                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2005                                          mdd_obj, handle);
2006         }
2007
2008 stop:
2009         if (handle != NULL)
2010                 mdd_trans_stop(env, mdd, rc, handle);
2011         return rc;
2012 }
2013
2014 /*
2015  * Permission check is done when open,
2016  * no need check again.
2017  */
2018 static int mdd_readpage_sanity_check(const struct lu_env *env,
2019                                      struct mdd_object *obj)
2020 {
2021         struct dt_object *next = mdd_object_child(obj);
2022         int rc;
2023         ENTRY;
2024
2025         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2026                 rc = 0;
2027         else
2028                 rc = -ENOTDIR;
2029
2030         RETURN(rc);
2031 }
2032
2033 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2034                               int nob, const struct dt_it_ops *iops,
2035                               struct dt_it *it, __u32 attr, void *arg)
2036 {
2037         struct lu_dirpage       *dp = &lp->lp_dir;
2038         void                    *area = dp;
2039         int                      result;
2040         __u64                    hash = 0;
2041         struct lu_dirent        *ent;
2042         struct lu_dirent        *last = NULL;
2043         int                      first = 1;
2044
2045         memset(area, 0, sizeof (*dp));
2046         area += sizeof (*dp);
2047         nob  -= sizeof (*dp);
2048
2049         ent  = area;
2050         do {
2051                 int    len;
2052                 int    recsize;
2053
2054                 len = iops->key_size(env, it);
2055
2056                 /* IAM iterator can return record with zero len. */
2057                 if (len == 0)
2058                         goto next;
2059
2060                 hash = iops->store(env, it);
2061                 if (unlikely(first)) {
2062                         first = 0;
2063                         dp->ldp_hash_start = cpu_to_le64(hash);
2064                 }
2065
2066                 /* calculate max space required for lu_dirent */
2067                 recsize = lu_dirent_calc_size(len, attr);
2068
2069                 if (nob >= recsize) {
2070                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2071                         if (result == -ESTALE)
2072                                 goto next;
2073                         if (result != 0)
2074                                 goto out;
2075
2076                         /* osd might not able to pack all attributes,
2077                          * so recheck rec length */
2078                         recsize = le16_to_cpu(ent->lde_reclen);
2079                 } else {
2080                         result = (last != NULL) ? 0 :-EINVAL;
2081                         goto out;
2082                 }
2083                 last = ent;
2084                 ent = (void *)ent + recsize;
2085                 nob -= recsize;
2086
2087 next:
2088                 result = iops->next(env, it);
2089                 if (result == -ESTALE)
2090                         goto next;
2091         } while (result == 0);
2092
2093 out:
2094         dp->ldp_hash_end = cpu_to_le64(hash);
2095         if (last != NULL) {
2096                 if (last->lde_hash == dp->ldp_hash_end)
2097                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2098                 last->lde_reclen = 0; /* end mark */
2099         }
2100         if (result > 0)
2101                 /* end of directory */
2102                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2103         else if (result < 0)
2104                 CWARN("build page failed: %d!\n", result);
2105         return result;
2106 }
2107
2108 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2109                  const struct lu_rdpg *rdpg)
2110 {
2111         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2112         int rc;
2113         ENTRY;
2114
2115         if (mdd_object_exists(mdd_obj) == 0) {
2116                 CERROR("%s: object "DFID" not found: rc = -2\n",
2117                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2118                 return -ENOENT;
2119         }
2120
2121         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2122         rc = mdd_readpage_sanity_check(env, mdd_obj);
2123         if (rc)
2124                 GOTO(out_unlock, rc);
2125
2126         if (mdd_is_dead_obj(mdd_obj)) {
2127                 struct page *pg;
2128                 struct lu_dirpage *dp;
2129
2130                 /*
2131                  * According to POSIX, please do not return any entry to client:
2132                  * even dot and dotdot should not be returned.
2133                  */
2134                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2135                        PFID(mdd_object_fid(mdd_obj)));
2136
2137                 if (rdpg->rp_count <= 0)
2138                         GOTO(out_unlock, rc = -EFAULT);
2139                 LASSERT(rdpg->rp_pages != NULL);
2140
2141                 pg = rdpg->rp_pages[0];
2142                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2143                 memset(dp, 0 , sizeof(struct lu_dirpage));
2144                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2145                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2146                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2147                 cfs_kunmap(pg);
2148                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2149         }
2150
2151         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2152                            mdd_dir_page_build, NULL);
2153         if (rc >= 0) {
2154                 struct lu_dirpage       *dp;
2155
2156                 dp = cfs_kmap(rdpg->rp_pages[0]);
2157                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2158                 if (rc == 0) {
2159                         /*
2160                          * No pages were processed, mark this for first page
2161                          * and send back.
2162                          */
2163                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2164                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2165                 }
2166                 cfs_kunmap(rdpg->rp_pages[0]);
2167         }
2168
2169         GOTO(out_unlock, rc);
2170 out_unlock:
2171         mdd_read_unlock(env, mdd_obj);
2172         return rc;
2173 }
2174
2175 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2176 {
2177         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2178
2179         if (mdd_object_exists(mdd_obj) == 0) {
2180                 CERROR("%s: object "DFID" not found: rc = -2\n",
2181                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2182                 return -ENOENT;
2183         }
2184         return dt_object_sync(env, mdd_object_child(mdd_obj));
2185 }
2186
2187 static int mdd_object_lock(const struct lu_env *env,
2188                            struct md_object *obj,
2189                            struct lustre_handle *lh,
2190                            struct ldlm_enqueue_info *einfo,
2191                            void *policy)
2192 {
2193         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2194         LASSERT(mdd_object_exists(mdd_obj));
2195         return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
2196                               einfo, policy);
2197 }
2198
2199 const struct md_object_operations mdd_obj_ops = {
2200         .moo_permission         = mdd_permission,
2201         .moo_attr_get           = mdd_attr_get,
2202         .moo_attr_set           = mdd_attr_set,
2203         .moo_xattr_get          = mdd_xattr_get,
2204         .moo_xattr_set          = mdd_xattr_set,
2205         .moo_xattr_list         = mdd_xattr_list,
2206         .moo_xattr_del          = mdd_xattr_del,
2207         .moo_swap_layouts       = mdd_swap_layouts,
2208         .moo_open               = mdd_open,
2209         .moo_close              = mdd_close,
2210         .moo_readpage           = mdd_readpage,
2211         .moo_readlink           = mdd_readlink,
2212         .moo_changelog          = mdd_changelog,
2213         .moo_capa_get           = mdd_capa_get,
2214         .moo_object_sync        = mdd_object_sync,
2215         .moo_path               = mdd_path,
2216         .moo_object_lock        = mdd_object_lock,
2217 };