Whamcloud - gitweb
LU-1303 mds: integration lod/osp into the stack
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct lu_object *mdd_object_alloc(const struct lu_env *env,
184                                    const struct lu_object_header *hdr,
185                                    struct lu_device *d)
186 {
187         struct mdd_object *mdd_obj;
188
189         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
190         if (mdd_obj != NULL) {
191                 struct lu_object *o;
192
193                 o = mdd2lu_obj(mdd_obj);
194                 lu_object_init(o, NULL, d);
195                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
196                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
197                 mdd_obj->mod_count = 0;
198                 o->lo_ops = &mdd_lu_obj_ops;
199                 return o;
200         } else {
201                 return NULL;
202         }
203 }
204
205 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
206                            const struct lu_object_conf *unused)
207 {
208         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
209         struct mdd_object *mdd_obj = lu2mdd_obj(o);
210         struct lu_object  *below;
211         struct lu_device  *under;
212         ENTRY;
213
214         mdd_obj->mod_cltime = 0;
215         under = &d->mdd_child->dd_lu_dev;
216         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
217         mdd_pdlock_init(mdd_obj);
218         if (below == NULL)
219                 RETURN(-ENOMEM);
220
221         lu_object_add(o, below);
222
223         RETURN(0);
224 }
225
226 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
227 {
228         if (lu_object_exists(o))
229                 return mdd_get_flags(env, lu2mdd_obj(o));
230         else
231                 return 0;
232 }
233
234 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
235 {
236         struct mdd_object *mdd = lu2mdd_obj(o);
237
238         lu_object_fini(o);
239         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
240 }
241
242 static int mdd_object_print(const struct lu_env *env, void *cookie,
243                             lu_printer_t p, const struct lu_object *o)
244 {
245         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
246         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
247                     "valid=%x, cltime="LPU64", flags=%lx)",
248                     mdd, mdd->mod_count, mdd->mod_valid,
249                     mdd->mod_cltime, mdd->mod_flags);
250 }
251
252 static const struct lu_object_operations mdd_lu_obj_ops = {
253         .loo_object_init    = mdd_object_init,
254         .loo_object_start   = mdd_object_start,
255         .loo_object_free    = mdd_object_free,
256         .loo_object_print   = mdd_object_print,
257 };
258
259 struct mdd_object *mdd_object_find(const struct lu_env *env,
260                                    struct mdd_device *d,
261                                    const struct lu_fid *f)
262 {
263         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
264 }
265
266 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
267                         const char *path, struct lu_fid *fid)
268 {
269         struct lu_buf *buf;
270         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
271         struct mdd_object *obj;
272         struct lu_name *lname = &mdd_env_info(env)->mti_name;
273         char *name;
274         int rc = 0;
275         ENTRY;
276
277         /* temp buffer for path element */
278         buf = mdd_buf_alloc(env, PATH_MAX);
279         if (buf->lb_buf == NULL)
280                 RETURN(-ENOMEM);
281
282         lname->ln_name = name = buf->lb_buf;
283         lname->ln_namelen = 0;
284         *f = mdd->mdd_root_fid;
285
286         while(1) {
287                 while (*path == '/')
288                         path++;
289                 if (*path == '\0')
290                         break;
291                 while (*path != '/' && *path != '\0') {
292                         *name = *path;
293                         path++;
294                         name++;
295                         lname->ln_namelen++;
296                 }
297
298                 *name = '\0';
299                 /* find obj corresponding to fid */
300                 obj = mdd_object_find(env, mdd, f);
301                 if (obj == NULL)
302                         GOTO(out, rc = -EREMOTE);
303                 if (IS_ERR(obj))
304                         GOTO(out, rc = PTR_ERR(obj));
305                 /* get child fid from parent and name */
306                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
307                 mdd_object_put(env, obj);
308                 if (rc)
309                         break;
310
311                 name = buf->lb_buf;
312                 lname->ln_namelen = 0;
313         }
314
315         if (!rc)
316                 *fid = *f;
317 out:
318         RETURN(rc);
319 }
320
321 /** The maximum depth that fid2path() will search.
322  * This is limited only because we want to store the fids for
323  * historical path lookup purposes.
324  */
325 #define MAX_PATH_DEPTH 100
326
327 /** mdd_path() lookup structure. */
328 struct path_lookup_info {
329         __u64                pli_recno;        /**< history point */
330         __u64                pli_currec;       /**< current record */
331         struct lu_fid        pli_fid;
332         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
333         struct mdd_object   *pli_mdd_obj;
334         char                *pli_path;         /**< full path */
335         int                  pli_pathlen;
336         int                  pli_linkno;       /**< which hardlink to follow */
337         int                  pli_fidcount;     /**< number of \a pli_fids */
338 };
339
340 static int mdd_path_current(const struct lu_env *env,
341                             struct path_lookup_info *pli)
342 {
343         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
344         struct mdd_object *mdd_obj;
345         struct lu_buf     *buf = NULL;
346         struct link_ea_header *leh;
347         struct link_ea_entry  *lee;
348         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
349         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
350         char *ptr;
351         int reclen;
352         int rc;
353         ENTRY;
354
355         ptr = pli->pli_path + pli->pli_pathlen - 1;
356         *ptr = 0;
357         --ptr;
358         pli->pli_fidcount = 0;
359         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
360
361         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
362                 mdd_obj = mdd_object_find(env, mdd,
363                                           &pli->pli_fids[pli->pli_fidcount]);
364                 if (mdd_obj == NULL)
365                         GOTO(out, rc = -EREMOTE);
366                 if (IS_ERR(mdd_obj))
367                         GOTO(out, rc = PTR_ERR(mdd_obj));
368                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
369                 if (rc <= 0) {
370                         mdd_object_put(env, mdd_obj);
371                         if (rc == -1)
372                                 rc = -EREMOTE;
373                         else if (rc == 0)
374                                 /* Do I need to error out here? */
375                                 rc = -ENOENT;
376                         GOTO(out, rc);
377                 }
378
379                 /* Get parent fid and object name */
380                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
381                 buf = mdd_links_get(env, mdd_obj);
382                 mdd_read_unlock(env, mdd_obj);
383                 mdd_object_put(env, mdd_obj);
384                 if (IS_ERR(buf))
385                         GOTO(out, rc = PTR_ERR(buf));
386
387                 leh = buf->lb_buf;
388                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
389                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
390
391                 /* If set, use link #linkno for path lookup, otherwise use
392                    link #0.  Only do this for the final path element. */
393                 if ((pli->pli_fidcount == 0) &&
394                     (pli->pli_linkno < leh->leh_reccount)) {
395                         int count;
396                         for (count = 0; count < pli->pli_linkno; count++) {
397                                 lee = (struct link_ea_entry *)
398                                      ((char *)lee + reclen);
399                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
400                         }
401                         if (pli->pli_linkno < leh->leh_reccount - 1)
402                                 /* indicate to user there are more links */
403                                 pli->pli_linkno++;
404                 }
405
406                 /* Pack the name in the end of the buffer */
407                 ptr -= tmpname->ln_namelen;
408                 if (ptr - 1 <= pli->pli_path)
409                         GOTO(out, rc = -EOVERFLOW);
410                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
411                 *(--ptr) = '/';
412
413                 /* Store the parent fid for historic lookup */
414                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
415                         GOTO(out, rc = -EOVERFLOW);
416                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
417         }
418
419         /* Verify that our path hasn't changed since we started the lookup.
420            Record the current index, and verify the path resolves to the
421            same fid. If it does, then the path is correct as of this index. */
422         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
423         pli->pli_currec = mdd->mdd_cl.mc_index;
424         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
425         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
426         if (rc) {
427                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
428                 GOTO (out, rc = -EAGAIN);
429         }
430         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
431                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
432                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
433                        PFID(&pli->pli_fid));
434                 GOTO(out, rc = -EAGAIN);
435         }
436         ptr++; /* skip leading / */
437         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
438
439         EXIT;
440 out:
441         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
442                 /* if we vmalloced a large buffer drop it */
443                 mdd_buf_put(buf);
444
445         return rc;
446 }
447
448 static int mdd_path_historic(const struct lu_env *env,
449                              struct path_lookup_info *pli)
450 {
451         return 0;
452 }
453
454 /* Returns the full path to this fid, as of changelog record recno. */
455 static int mdd_path(const struct lu_env *env, struct md_object *obj,
456                     char *path, int pathlen, __u64 *recno, int *linkno)
457 {
458         struct path_lookup_info *pli;
459         int tries = 3;
460         int rc = -EAGAIN;
461         ENTRY;
462
463         if (pathlen < 3)
464                 RETURN(-EOVERFLOW);
465
466         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
467                 path[0] = '\0';
468                 RETURN(0);
469         }
470
471         OBD_ALLOC_PTR(pli);
472         if (pli == NULL)
473                 RETURN(-ENOMEM);
474
475         pli->pli_mdd_obj = md2mdd_obj(obj);
476         pli->pli_recno = *recno;
477         pli->pli_path = path;
478         pli->pli_pathlen = pathlen;
479         pli->pli_linkno = *linkno;
480
481         /* Retry multiple times in case file is being moved */
482         while (tries-- && rc == -EAGAIN)
483                 rc = mdd_path_current(env, pli);
484
485         /* For historical path lookup, the current links may not have existed
486          * at "recno" time.  We must switch over to earlier links/parents
487          * by using the changelog records.  If the earlier parent doesn't
488          * exist, we must search back through the changelog to reconstruct
489          * its parents, then check if it exists, etc.
490          * We may ignore this problem for the initial implementation and
491          * state that an "original" hardlink must still exist for us to find
492          * historic path name. */
493         if (pli->pli_recno != -1) {
494                 rc = mdd_path_historic(env, pli);
495         } else {
496                 *recno = pli->pli_currec;
497                 /* Return next link index to caller */
498                 *linkno = pli->pli_linkno;
499         }
500
501         OBD_FREE_PTR(pli);
502
503         RETURN (rc);
504 }
505
506 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
507 {
508         struct lu_attr *la = &mdd_env_info(env)->mti_la;
509         int rc;
510
511         ENTRY;
512         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
513         if (rc == 0) {
514                 mdd_flags_xlate(obj, la->la_flags);
515         }
516         RETURN(rc);
517 }
518
519 /*
520  * No permission check is needed.
521  */
522 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
523                  struct md_attr *ma)
524 {
525         int rc;
526         ENTRY;
527
528         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
529                           mdd_object_capa(env, md2mdd_obj(obj)));
530         RETURN(rc);
531 }
532
533 /*
534  * No permission check is needed.
535  */
536 static int mdd_xattr_get(const struct lu_env *env,
537                          struct md_object *obj, struct lu_buf *buf,
538                          const char *name)
539 {
540         struct mdd_object *mdd_obj = md2mdd_obj(obj);
541         int rc;
542
543         ENTRY;
544
545         if (mdd_object_exists(mdd_obj) == 0) {
546                 CERROR("%s: object "DFID" not found: rc = -2\n",
547                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
548                 return -ENOENT;
549         }
550
551         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
552         rc = mdo_xattr_get(env, mdd_obj, buf, name,
553                            mdd_object_capa(env, mdd_obj));
554         mdd_read_unlock(env, mdd_obj);
555
556         RETURN(rc);
557 }
558
559 /*
560  * Permission check is done when open,
561  * no need check again.
562  */
563 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
564                         struct lu_buf *buf)
565 {
566         struct mdd_object *mdd_obj = md2mdd_obj(obj);
567         struct dt_object  *next;
568         loff_t             pos = 0;
569         int                rc;
570         ENTRY;
571
572         if (mdd_object_exists(mdd_obj) == 0) {
573                 CERROR("%s: object "DFID" not found: rc = -2\n",
574                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
575                 return -ENOENT;
576         }
577
578         next = mdd_object_child(mdd_obj);
579         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
580         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
581                                          mdd_object_capa(env, mdd_obj));
582         mdd_read_unlock(env, mdd_obj);
583         RETURN(rc);
584 }
585
586 /*
587  * No permission check is needed.
588  */
589 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
590                           struct lu_buf *buf)
591 {
592         struct mdd_object *mdd_obj = md2mdd_obj(obj);
593         int rc;
594
595         ENTRY;
596
597         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
598         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
599         mdd_read_unlock(env, mdd_obj);
600
601         RETURN(rc);
602 }
603
604 int mdd_declare_object_create_internal(const struct lu_env *env,
605                                        struct mdd_object *p,
606                                        struct mdd_object *c,
607                                        struct lu_attr *attr,
608                                        struct thandle *handle,
609                                        const struct md_op_spec *spec)
610 {
611         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
612         const struct dt_index_features *feat = spec->sp_feat;
613         int rc;
614         ENTRY;
615
616         if (feat != &dt_directory_features && feat != NULL) {
617                 dof->dof_type = DFT_INDEX;
618                 dof->u.dof_idx.di_feat = feat;
619
620         } else {
621                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
622                 if (dof->dof_type == DFT_REGULAR) {
623                         dof->u.dof_reg.striped =
624                                 md_should_create(spec->sp_cr_flags);
625                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
626                                 dof->u.dof_reg.striped = 0;
627                         /* is this replay? */
628                         if (spec->no_create)
629                                 dof->u.dof_reg.striped = 0;
630                 }
631         }
632
633         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
634
635         RETURN(rc);
636 }
637
638 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
639                                struct mdd_object *c, struct lu_attr *attr,
640                                struct thandle *handle,
641                                const struct md_op_spec *spec)
642 {
643         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
644         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
645         int rc;
646         ENTRY;
647
648         LASSERT(!mdd_object_exists(c));
649
650         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
651
652         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
653
654         RETURN(rc);
655 }
656
657 /**
658  * Make sure the ctime is increased only.
659  */
660 static inline int mdd_attr_check(const struct lu_env *env,
661                                  struct mdd_object *obj,
662                                  struct lu_attr *attr)
663 {
664         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
665         int rc;
666         ENTRY;
667
668         if (attr->la_valid & LA_CTIME) {
669                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
670                 if (rc)
671                         RETURN(rc);
672
673                 if (attr->la_ctime < tmp_la->la_ctime)
674                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
675                 else if (attr->la_valid == LA_CTIME &&
676                          attr->la_ctime == tmp_la->la_ctime)
677                         attr->la_valid &= ~LA_CTIME;
678         }
679         RETURN(0);
680 }
681
682 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
683                           struct lu_attr *attr, struct thandle *handle,
684                           int needacl)
685 {
686         int rc;
687         ENTRY;
688
689         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
690 #ifdef CONFIG_FS_POSIX_ACL
691         if (!rc && (attr->la_valid & LA_MODE) && needacl)
692                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
693 #endif
694         RETURN(rc);
695 }
696
697 int mdd_attr_check_set_internal(const struct lu_env *env,
698                                 struct mdd_object *obj, struct lu_attr *attr,
699                                 struct thandle *handle, int needacl)
700 {
701         int rc;
702         ENTRY;
703
704         rc = mdd_attr_check(env, obj, attr);
705         if (rc)
706                 RETURN(rc);
707
708         if (attr->la_valid)
709                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
710         RETURN(rc);
711 }
712
713 /*
714  * This gives the same functionality as the code between
715  * sys_chmod and inode_setattr
716  * chown_common and inode_setattr
717  * utimes and inode_setattr
718  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
719  */
720 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
721                         struct lu_attr *la, const unsigned long flags)
722 {
723         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
724         struct md_ucred  *uc;
725         int               rc;
726         ENTRY;
727
728         if (!la->la_valid)
729                 RETURN(0);
730
731         /* Do not permit change file type */
732         if (la->la_valid & LA_TYPE)
733                 RETURN(-EPERM);
734
735         /* They should not be processed by setattr */
736         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
737                 RETURN(-EPERM);
738
739         /* export destroy does not have ->le_ses, but we may want
740          * to drop LUSTRE_SOM_FL. */
741         if (!env->le_ses)
742                 RETURN(0);
743
744         uc = md_ucred(env);
745
746         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
747         if (rc)
748                 RETURN(rc);
749
750         if (la->la_valid == LA_CTIME) {
751                 if (!(flags & MDS_PERM_BYPASS))
752                         /* This is only for set ctime when rename's source is
753                          * on remote MDS. */
754                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
755                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
756                         la->la_valid &= ~LA_CTIME;
757                 RETURN(rc);
758         }
759
760         if (la->la_valid == LA_ATIME) {
761                 /* This is atime only set for read atime update on close. */
762                 if (la->la_atime >= tmp_la->la_atime &&
763                     la->la_atime < (tmp_la->la_atime +
764                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
765                         la->la_valid &= ~LA_ATIME;
766                 RETURN(0);
767         }
768
769         /* Check if flags change. */
770         if (la->la_valid & LA_FLAGS) {
771                 unsigned int oldflags = 0;
772                 unsigned int newflags = la->la_flags &
773                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
774
775                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
776                     !mdd_capable(uc, CFS_CAP_FOWNER))
777                         RETURN(-EPERM);
778
779                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
780                  * only be changed by the relevant capability. */
781                 if (mdd_is_immutable(obj))
782                         oldflags |= LUSTRE_IMMUTABLE_FL;
783                 if (mdd_is_append(obj))
784                         oldflags |= LUSTRE_APPEND_FL;
785                 if ((oldflags ^ newflags) &&
786                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
787                         RETURN(-EPERM);
788
789                 if (!S_ISDIR(tmp_la->la_mode))
790                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
791         }
792
793         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
794             (la->la_valid & ~LA_FLAGS) &&
795             !(flags & MDS_PERM_BYPASS))
796                 RETURN(-EPERM);
797
798         /* Check for setting the obj time. */
799         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
800             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
801                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
802                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
803                         rc = mdd_permission_internal(env, obj, tmp_la,
804                                                      MAY_WRITE);
805                         if (rc)
806                                 RETURN(rc);
807                 }
808         }
809
810         if (la->la_valid & LA_KILL_SUID) {
811                 la->la_valid &= ~LA_KILL_SUID;
812                 if ((tmp_la->la_mode & S_ISUID) &&
813                     !(la->la_valid & LA_MODE)) {
814                         la->la_mode = tmp_la->la_mode;
815                         la->la_valid |= LA_MODE;
816                 }
817                 la->la_mode &= ~S_ISUID;
818         }
819
820         if (la->la_valid & LA_KILL_SGID) {
821                 la->la_valid &= ~LA_KILL_SGID;
822                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
823                                         (S_ISGID | S_IXGRP)) &&
824                     !(la->la_valid & LA_MODE)) {
825                         la->la_mode = tmp_la->la_mode;
826                         la->la_valid |= LA_MODE;
827                 }
828                 la->la_mode &= ~S_ISGID;
829         }
830
831         /* Make sure a caller can chmod. */
832         if (la->la_valid & LA_MODE) {
833                 if (!(flags & MDS_PERM_BYPASS) &&
834                     (uc->mu_fsuid != tmp_la->la_uid) &&
835                     !mdd_capable(uc, CFS_CAP_FOWNER))
836                         RETURN(-EPERM);
837
838                 if (la->la_mode == (cfs_umode_t) -1)
839                         la->la_mode = tmp_la->la_mode;
840                 else
841                         la->la_mode = (la->la_mode & S_IALLUGO) |
842                                       (tmp_la->la_mode & ~S_IALLUGO);
843
844                 /* Also check the setgid bit! */
845                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
846                                        la->la_gid : tmp_la->la_gid) &&
847                     !mdd_capable(uc, CFS_CAP_FSETID))
848                         la->la_mode &= ~S_ISGID;
849         } else {
850                la->la_mode = tmp_la->la_mode;
851         }
852
853         /* Make sure a caller can chown. */
854         if (la->la_valid & LA_UID) {
855                 if (la->la_uid == (uid_t) -1)
856                         la->la_uid = tmp_la->la_uid;
857                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
858                     (la->la_uid != tmp_la->la_uid)) &&
859                     !mdd_capable(uc, CFS_CAP_CHOWN))
860                         RETURN(-EPERM);
861
862                 /* If the user or group of a non-directory has been
863                  * changed by a non-root user, remove the setuid bit.
864                  * 19981026 David C Niemi <niemi@tux.org>
865                  *
866                  * Changed this to apply to all users, including root,
867                  * to avoid some races. This is the behavior we had in
868                  * 2.0. The check for non-root was definitely wrong
869                  * for 2.2 anyway, as it should have been using
870                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
871                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
872                     !S_ISDIR(tmp_la->la_mode)) {
873                         la->la_mode &= ~S_ISUID;
874                         la->la_valid |= LA_MODE;
875                 }
876         }
877
878         /* Make sure caller can chgrp. */
879         if (la->la_valid & LA_GID) {
880                 if (la->la_gid == (gid_t) -1)
881                         la->la_gid = tmp_la->la_gid;
882                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
883                     ((la->la_gid != tmp_la->la_gid) &&
884                     !lustre_in_group_p(uc, la->la_gid))) &&
885                     !mdd_capable(uc, CFS_CAP_CHOWN))
886                         RETURN(-EPERM);
887
888                 /* Likewise, if the user or group of a non-directory
889                  * has been changed by a non-root user, remove the
890                  * setgid bit UNLESS there is no group execute bit
891                  * (this would be a file marked for mandatory
892                  * locking).  19981026 David C Niemi <niemi@tux.org>
893                  *
894                  * Removed the fsuid check (see the comment above) --
895                  * 19990830 SD. */
896                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
897                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
898                         la->la_mode &= ~S_ISGID;
899                         la->la_valid |= LA_MODE;
900                 }
901         }
902
903         /* For both Size-on-MDS case and truncate case,
904          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
905          * We distinguish them by "flags & MDS_SOM".
906          * For SOM case, it is true, the MAY_WRITE perm has been checked
907          * when open, no need check again. For truncate case, it is false,
908          * the MAY_WRITE perm should be checked here. */
909         if (flags & MDS_SOM) {
910                 /* For the "Size-on-MDS" setattr update, merge coming
911                  * attributes with the set in the inode. BUG 10641 */
912                 if ((la->la_valid & LA_ATIME) &&
913                     (la->la_atime <= tmp_la->la_atime))
914                         la->la_valid &= ~LA_ATIME;
915
916                 /* OST attributes do not have a priority over MDS attributes,
917                  * so drop times if ctime is equal. */
918                 if ((la->la_valid & LA_CTIME) &&
919                     (la->la_ctime <= tmp_la->la_ctime))
920                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
921         } else {
922                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
923                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
924                               (uc->mu_fsuid == tmp_la->la_uid)) &&
925                             !(flags & MDS_PERM_BYPASS)) {
926                                 rc = mdd_permission_internal(env, obj,
927                                                              tmp_la, MAY_WRITE);
928                                 if (rc)
929                                         RETURN(rc);
930                         }
931                 }
932                 if (la->la_valid & LA_CTIME) {
933                         /* The pure setattr, it has the priority over what is
934                          * already set, do not drop it if ctime is equal. */
935                         if (la->la_ctime < tmp_la->la_ctime)
936                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
937                                                   LA_CTIME);
938                 }
939         }
940
941         RETURN(0);
942 }
943
944 /** Store a data change changelog record
945  * If this fails, we must fail the whole transaction; we don't
946  * want the change to commit without the log entry.
947  * \param mdd_obj - mdd_object of change
948  * \param handle - transacion handle
949  */
950 static int mdd_changelog_data_store(const struct lu_env     *env,
951                                     struct mdd_device       *mdd,
952                                     enum changelog_rec_type type,
953                                     int                     flags,
954                                     struct mdd_object       *mdd_obj,
955                                     struct thandle          *handle)
956 {
957         const struct lu_fid *tfid = mdo2fid(mdd_obj);
958         struct llog_changelog_rec *rec;
959         struct thandle *th = NULL;
960         struct lu_buf *buf;
961         int reclen;
962         int rc;
963
964         /* Not recording */
965         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
966                 RETURN(0);
967         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
968                 RETURN(0);
969
970         LASSERT(mdd_obj != NULL);
971         LASSERT(handle != NULL);
972
973         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
974             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
975                 /* Don't need multiple updates in this log */
976                 /* Don't check under lock - no big deal if we get an extra
977                    entry */
978                 RETURN(0);
979         }
980
981         reclen = llog_data_len(sizeof(*rec));
982         buf = mdd_buf_alloc(env, reclen);
983         if (buf->lb_buf == NULL)
984                 RETURN(-ENOMEM);
985         rec = (struct llog_changelog_rec *)buf->lb_buf;
986
987         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
988         rec->cr.cr_type = (__u32)type;
989         rec->cr.cr_tfid = *tfid;
990         rec->cr.cr_namelen = 0;
991         mdd_obj->mod_cltime = cfs_time_current_64();
992
993         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
994
995         if (th)
996                 mdd_trans_stop(env, mdd, rc, th);
997
998         if (rc < 0) {
999                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1000                        rc, type, PFID(tfid));
1001                 return -EFAULT;
1002         }
1003
1004         return 0;
1005 }
1006
1007 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1008                   int flags, struct md_object *obj)
1009 {
1010         struct thandle *handle;
1011         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1012         struct mdd_device *mdd = mdo2mdd(obj);
1013         int rc;
1014         ENTRY;
1015
1016         handle = mdd_trans_create(env, mdd);
1017         if (IS_ERR(handle))
1018                 return(PTR_ERR(handle));
1019
1020         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1021         if (rc)
1022                 GOTO(stop, rc);
1023
1024         rc = mdd_trans_start(env, mdd, handle);
1025         if (rc)
1026                 GOTO(stop, rc);
1027
1028         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1029                                       handle);
1030
1031 stop:
1032         mdd_trans_stop(env, mdd, rc, handle);
1033
1034         RETURN(rc);
1035 }
1036
1037 /**
1038  * Save LMA extended attributes with data from \a ma.
1039  *
1040  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1041  * not, LMA EA will be first read from disk, modified and write back.
1042  *
1043  */
1044 /* Precedence for choosing record type when multiple
1045  * attributes change: setattr > mtime > ctime > atime
1046  * (ctime changes when mtime does, plus chmod/chown.
1047  * atime and ctime are independent.) */
1048 static int mdd_attr_set_changelog(const struct lu_env *env,
1049                                   struct md_object *obj, struct thandle *handle,
1050                                   __u64 valid)
1051 {
1052         struct mdd_device *mdd = mdo2mdd(obj);
1053         int bits, type = 0;
1054
1055         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1056         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1057         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1058         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1059         bits = bits & mdd->mdd_cl.mc_mask;
1060         if (bits == 0)
1061                 return 0;
1062
1063         /* The record type is the lowest non-masked set bit */
1064         while (bits && ((bits & 1) == 0)) {
1065                 bits = bits >> 1;
1066                 type++;
1067         }
1068
1069         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1070         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1071                                         md2mdd_obj(obj), handle);
1072 }
1073
1074 static int mdd_declare_attr_set(const struct lu_env *env,
1075                                 struct mdd_device *mdd,
1076                                 struct mdd_object *obj,
1077                                 const struct lu_attr *attr,
1078                                 struct thandle *handle)
1079 {
1080         int rc;
1081
1082         rc = mdo_declare_attr_set(env, obj, attr, handle);
1083         if (rc)
1084                 return rc;
1085
1086         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1087         if (rc)
1088                 return rc;
1089
1090 #ifdef CONFIG_FS_POSIX_ACL
1091         if (attr->la_valid & LA_MODE) {
1092                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1093                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1094                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1095                 mdd_read_unlock(env, obj);
1096                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1097                         rc = 0;
1098                 else if (rc < 0)
1099                         return rc;
1100
1101                 if (rc != 0) {
1102                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1103                         rc = mdo_declare_xattr_set(env, obj, buf,
1104                                                    XATTR_NAME_ACL_ACCESS, 0,
1105                                                    handle);
1106                         if (rc)
1107                                 return rc;
1108                 }
1109         }
1110 #endif
1111
1112         return rc;
1113 }
1114
1115 /* set attr and LOV EA at once, return updated attr */
1116 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1117                  const struct md_attr *ma)
1118 {
1119         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1120         struct mdd_device *mdd = mdo2mdd(obj);
1121         struct thandle *handle;
1122         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1123         const struct lu_attr *la = &ma->ma_attr;
1124         int rc;
1125         ENTRY;
1126
1127         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1128         LASSERT((ma->ma_valid & MA_LOV) == 0);
1129         LASSERT((ma->ma_valid & MA_HSM) == 0);
1130         LASSERT((ma->ma_valid & MA_SOM) == 0);
1131
1132         *la_copy = ma->ma_attr;
1133         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1134         if (rc)
1135                 RETURN(rc);
1136
1137         /* setattr on "close" only change atime, or do nothing */
1138         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1139                 RETURN(0);
1140
1141         handle = mdd_trans_create(env, mdd);
1142         if (IS_ERR(handle))
1143                 RETURN(PTR_ERR(handle));
1144
1145         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1146         if (rc)
1147                 GOTO(stop, rc);
1148
1149         rc = mdd_trans_start(env, mdd, handle);
1150         if (rc)
1151                 GOTO(stop, rc);
1152
1153         /* permission changes may require sync operation */
1154         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1155                 handle->th_sync |= !!mdd->mdd_sync_permission;
1156
1157         if (la->la_valid & (LA_MTIME | LA_CTIME))
1158                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1159                        la->la_mtime, la->la_ctime);
1160
1161         if (la_copy->la_valid & LA_FLAGS) {
1162                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1163                 if (rc == 0)
1164                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1165         } else if (la_copy->la_valid) {            /* setattr */
1166                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1167         }
1168
1169         if (rc == 0)
1170                 rc = mdd_attr_set_changelog(env, obj, handle,
1171                                             la->la_valid);
1172 stop:
1173         mdd_trans_stop(env, mdd, rc, handle);
1174         RETURN(rc);
1175 }
1176
1177 static int mdd_xattr_sanity_check(const struct lu_env *env,
1178                                   struct mdd_object *obj)
1179 {
1180         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1181         struct md_ucred *uc     = md_ucred(env);
1182         int rc;
1183         ENTRY;
1184
1185         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1186                 RETURN(-EPERM);
1187
1188         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1189         if (rc)
1190                 RETURN(rc);
1191
1192         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1193             !mdd_capable(uc, CFS_CAP_FOWNER))
1194                 RETURN(-EPERM);
1195
1196         RETURN(rc);
1197 }
1198
1199 static int mdd_declare_xattr_set(const struct lu_env *env,
1200                                  struct mdd_device *mdd,
1201                                  struct mdd_object *obj,
1202                                  const struct lu_buf *buf,
1203                                  const char *name,
1204                                  struct thandle *handle)
1205 {
1206         int rc;
1207
1208         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1209         if (rc)
1210                 return rc;
1211
1212         /* Only record user xattr changes */
1213         if ((strncmp("user.", name, 5) == 0))
1214                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1215
1216         return rc;
1217 }
1218
1219 /**
1220  * The caller should guarantee to update the object ctime
1221  * after xattr_set if needed.
1222  */
1223 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1224                          const struct lu_buf *buf, const char *name,
1225                          int fl)
1226 {
1227         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1228         struct mdd_device *mdd = mdo2mdd(obj);
1229         struct thandle *handle;
1230         int  rc;
1231         ENTRY;
1232
1233         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1234                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1235                 RETURN(rc);
1236         }
1237
1238         rc = mdd_xattr_sanity_check(env, mdd_obj);
1239         if (rc)
1240                 RETURN(rc);
1241
1242         handle = mdd_trans_create(env, mdd);
1243         if (IS_ERR(handle))
1244                 RETURN(PTR_ERR(handle));
1245
1246         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1247         if (rc)
1248                 GOTO(stop, rc);
1249
1250         rc = mdd_trans_start(env, mdd, handle);
1251         if (rc)
1252                 GOTO(stop, rc);
1253
1254         /* security-replated changes may require sync */
1255         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1256                 handle->th_sync |= !!mdd->mdd_sync_permission;
1257
1258         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1259         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1260                            mdd_object_capa(env, mdd_obj));
1261         mdd_write_unlock(env, mdd_obj);
1262         if (rc)
1263                 GOTO(stop, rc);
1264
1265         /* Only record system & user xattr changes */
1266         if (strncmp(XATTR_USER_PREFIX, name,
1267                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1268                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1269                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1270                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1271                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1272                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1273                                               handle);
1274
1275 stop:
1276         mdd_trans_stop(env, mdd, rc, handle);
1277
1278         RETURN(rc);
1279 }
1280
1281 static int mdd_declare_xattr_del(const struct lu_env *env,
1282                                  struct mdd_device *mdd,
1283                                  struct mdd_object *obj,
1284                                  const char *name,
1285                                  struct thandle *handle)
1286 {
1287         int rc;
1288
1289         rc = mdo_declare_xattr_del(env, obj, name, handle);
1290         if (rc)
1291                 return rc;
1292
1293         /* Only record user xattr changes */
1294         if ((strncmp("user.", name, 5) == 0))
1295                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1296
1297         return rc;
1298 }
1299
1300 /**
1301  * The caller should guarantee to update the object ctime
1302  * after xattr_set if needed.
1303  */
1304 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1305                   const char *name)
1306 {
1307         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1308         struct mdd_device *mdd = mdo2mdd(obj);
1309         struct thandle *handle;
1310         int  rc;
1311         ENTRY;
1312
1313         rc = mdd_xattr_sanity_check(env, mdd_obj);
1314         if (rc)
1315                 RETURN(rc);
1316
1317         handle = mdd_trans_create(env, mdd);
1318         if (IS_ERR(handle))
1319                 RETURN(PTR_ERR(handle));
1320
1321         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1322         if (rc)
1323                 GOTO(stop, rc);
1324
1325         rc = mdd_trans_start(env, mdd, handle);
1326         if (rc)
1327                 GOTO(stop, rc);
1328
1329         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1330         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1331                            mdd_object_capa(env, mdd_obj));
1332         mdd_write_unlock(env, mdd_obj);
1333         if (rc)
1334                 GOTO(stop, rc);
1335
1336         /* Only record system & user xattr changes */
1337         if (strncmp(XATTR_USER_PREFIX, name,
1338                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1339                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1340                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1341                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1342                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1343                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1344                                               handle);
1345
1346 stop:
1347         mdd_trans_stop(env, mdd, rc, handle);
1348
1349         RETURN(rc);
1350 }
1351
1352 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1353                 struct mdd_object *child, struct lu_attr *attr)
1354 {
1355         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1356         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1357         struct dt_object *nc = mdd_object_child(child);
1358
1359         /* @hint will be initialized by underlying device. */
1360         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1361 }
1362
1363 /*
1364  * do NOT or the MAY_*'s, you'll get the weakest
1365  */
1366 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1367 {
1368         int res = 0;
1369
1370         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1371          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1372          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1373          * owner can write to a file even if it is marked readonly to hide
1374          * its brokenness. (bug 5781) */
1375         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1376                 struct md_ucred *uc = md_ucred(env);
1377
1378                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1379                     (la->la_uid == uc->mu_fsuid))
1380                         return 0;
1381         }
1382
1383         if (flags & FMODE_READ)
1384                 res |= MAY_READ;
1385         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1386                 res |= MAY_WRITE;
1387         if (flags & MDS_FMODE_EXEC)
1388                 res = MAY_EXEC;
1389         return res;
1390 }
1391
1392 static int mdd_open_sanity_check(const struct lu_env *env,
1393                                  struct mdd_object *obj, int flag)
1394 {
1395         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1396         int mode, rc;
1397         ENTRY;
1398
1399         /* EEXIST check */
1400         if (mdd_is_dead_obj(obj))
1401                 RETURN(-ENOENT);
1402
1403         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1404         if (rc)
1405                RETURN(rc);
1406
1407         if (S_ISLNK(tmp_la->la_mode))
1408                 RETURN(-ELOOP);
1409
1410         mode = accmode(env, tmp_la, flag);
1411
1412         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1413                 RETURN(-EISDIR);
1414
1415         if (!(flag & MDS_OPEN_CREATED)) {
1416                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1417                 if (rc)
1418                         RETURN(rc);
1419         }
1420
1421         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1422             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1423                 flag &= ~MDS_OPEN_TRUNC;
1424
1425         /* For writing append-only file must open it with append mode. */
1426         if (mdd_is_append(obj)) {
1427                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1428                         RETURN(-EPERM);
1429                 if (flag & MDS_OPEN_TRUNC)
1430                         RETURN(-EPERM);
1431         }
1432
1433 #if 0
1434         /*
1435          * Now, flag -- O_NOATIME does not be packed by client.
1436          */
1437         if (flag & O_NOATIME) {
1438                 struct md_ucred *uc = md_ucred(env);
1439
1440                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1441                     (uc->mu_valid == UCRED_NEW)) &&
1442                     (uc->mu_fsuid != tmp_la->la_uid) &&
1443                     !mdd_capable(uc, CFS_CAP_FOWNER))
1444                         RETURN(-EPERM);
1445         }
1446 #endif
1447
1448         RETURN(0);
1449 }
1450
1451 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1452                     int flags)
1453 {
1454         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1455         int rc = 0;
1456
1457         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1458
1459         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1460         if (rc == 0)
1461                 mdd_obj->mod_count++;
1462
1463         mdd_write_unlock(env, mdd_obj);
1464         return rc;
1465 }
1466
1467 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1468                             struct md_attr *ma, struct thandle *handle)
1469 {
1470         return mdo_declare_destroy(env, obj, handle);
1471 }
1472
1473 /* return md_attr back,
1474  * if it is last unlink then return lov ea + llog cookie*/
1475 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1476                     struct md_attr *ma, struct thandle *handle)
1477 {
1478         int rc;
1479         ENTRY;
1480
1481         rc = mdo_destroy(env, obj, handle);
1482
1483         RETURN(rc);
1484 }
1485
1486 static int mdd_declare_close(const struct lu_env *env,
1487                              struct mdd_object *obj,
1488                              struct md_attr *ma,
1489                              struct thandle *handle)
1490 {
1491         int rc;
1492
1493         rc = orph_declare_index_delete(env, obj, handle);
1494         if (rc)
1495                 return rc;
1496
1497         return mdo_declare_destroy(env, obj, handle);
1498 }
1499
1500 /*
1501  * No permission check is needed.
1502  */
1503 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1504                      struct md_attr *ma, int mode)
1505 {
1506         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1507         struct mdd_device *mdd = mdo2mdd(obj);
1508         struct thandle    *handle = NULL;
1509         int rc, is_orphan = 0;
1510         ENTRY;
1511
1512         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1513                 mdd_obj->mod_count--;
1514
1515                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1516                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1517                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1518                 RETURN(0);
1519         }
1520
1521         /* check without any lock */
1522         if (mdd_obj->mod_count == 1 &&
1523             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1524  again:
1525                 handle = mdd_trans_create(env, mdo2mdd(obj));
1526                 if (IS_ERR(handle))
1527                         RETURN(PTR_ERR(handle));
1528
1529                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1530                 if (rc)
1531                         GOTO(stop, rc);
1532
1533                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1534                 if (rc)
1535                         GOTO(stop, rc);
1536
1537                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1538                 if (rc)
1539                         GOTO(stop, rc);
1540         }
1541
1542         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1543         if (handle == NULL && mdd_obj->mod_count == 1 &&
1544             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1545                 mdd_write_unlock(env, mdd_obj);
1546                 goto again;
1547         }
1548
1549         /* release open count */
1550         mdd_obj->mod_count --;
1551
1552         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1553                 /* remove link to object from orphan index */
1554                 LASSERT(handle != NULL);
1555                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1556                 if (rc == 0) {
1557                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1558                                "list, OSS objects to be destroyed.\n",
1559                                PFID(mdd_object_fid(mdd_obj)));
1560                         is_orphan = 1;
1561                 } else {
1562                         CERROR("Object "DFID" can not be deleted from orphan "
1563                                 "list, maybe cause OST objects can not be "
1564                                 "destroyed (err: %d).\n",
1565                                 PFID(mdd_object_fid(mdd_obj)), rc);
1566                         /* If object was not deleted from orphan list, do not
1567                          * destroy OSS objects, which will be done when next
1568                          * recovery. */
1569                         GOTO(out, rc);
1570                 }
1571         }
1572
1573         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1574                         mdd_object_capa(env, mdd_obj));
1575         /* Object maybe not in orphan list originally, it is rare case for
1576          * mdd_finish_unlink() failure. */
1577         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
1578                 if (handle == NULL) {
1579                         handle = mdd_trans_create(env, mdo2mdd(obj));
1580                         if (IS_ERR(handle))
1581                                 GOTO(out, rc = PTR_ERR(handle));
1582
1583                         rc = mdo_declare_destroy(env, mdd_obj, handle);
1584                         if (rc)
1585                                 GOTO(out, rc);
1586
1587                         rc = mdd_declare_changelog_store(env, mdd,
1588                                         NULL, handle);
1589                         if (rc)
1590                                 GOTO(stop, rc);
1591
1592                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1593                         if (rc)
1594                                 GOTO(out, rc);
1595                 }
1596
1597                 rc = mdo_destroy(env, mdd_obj, handle);
1598
1599                 if (rc != 0)
1600                         CERROR("Error when prepare to delete Object "DFID" , "
1601                                "which will cause OST objects can not be "
1602                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1603         }
1604         EXIT;
1605
1606 out:
1607
1608         mdd_write_unlock(env, mdd_obj);
1609
1610         if (rc == 0 &&
1611             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1612             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1613                 if (handle == NULL) {
1614                         handle = mdd_trans_create(env, mdo2mdd(obj));
1615                         if (IS_ERR(handle))
1616                                 GOTO(stop, rc = IS_ERR(handle));
1617
1618                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1619                                                          handle);
1620                         if (rc)
1621                                 GOTO(stop, rc);
1622
1623                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1624                         if (rc)
1625                                 GOTO(stop, rc);
1626                 }
1627
1628                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1629                                          mdd_obj, handle);
1630         }
1631
1632 stop:
1633         if (handle != NULL)
1634                 mdd_trans_stop(env, mdd, rc, handle);
1635         return rc;
1636 }
1637
1638 /*
1639  * Permission check is done when open,
1640  * no need check again.
1641  */
1642 static int mdd_readpage_sanity_check(const struct lu_env *env,
1643                                      struct mdd_object *obj)
1644 {
1645         struct dt_object *next = mdd_object_child(obj);
1646         int rc;
1647         ENTRY;
1648
1649         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1650                 rc = 0;
1651         else
1652                 rc = -ENOTDIR;
1653
1654         RETURN(rc);
1655 }
1656
1657 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
1658                               int nob, const struct dt_it_ops *iops,
1659                               struct dt_it *it, __u32 attr, void *arg)
1660 {
1661         struct lu_dirpage       *dp = &lp->lp_dir;
1662         void                    *area = dp;
1663         int                      result;
1664         __u64                    hash = 0;
1665         struct lu_dirent        *ent;
1666         struct lu_dirent        *last = NULL;
1667         int                      first = 1;
1668
1669         memset(area, 0, sizeof (*dp));
1670         area += sizeof (*dp);
1671         nob  -= sizeof (*dp);
1672
1673         ent  = area;
1674         do {
1675                 int    len;
1676                 int    recsize;
1677
1678                 len = iops->key_size(env, it);
1679
1680                 /* IAM iterator can return record with zero len. */
1681                 if (len == 0)
1682                         goto next;
1683
1684                 hash = iops->store(env, it);
1685                 if (unlikely(first)) {
1686                         first = 0;
1687                         dp->ldp_hash_start = cpu_to_le64(hash);
1688                 }
1689
1690                 /* calculate max space required for lu_dirent */
1691                 recsize = lu_dirent_calc_size(len, attr);
1692
1693                 if (nob >= recsize) {
1694                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
1695                         if (result == -ESTALE)
1696                                 goto next;
1697                         if (result != 0)
1698                                 goto out;
1699
1700                         /* osd might not able to pack all attributes,
1701                          * so recheck rec length */
1702                         recsize = le16_to_cpu(ent->lde_reclen);
1703                 } else {
1704                         result = (last != NULL) ? 0 :-EINVAL;
1705                         goto out;
1706                 }
1707                 last = ent;
1708                 ent = (void *)ent + recsize;
1709                 nob -= recsize;
1710
1711 next:
1712                 result = iops->next(env, it);
1713                 if (result == -ESTALE)
1714                         goto next;
1715         } while (result == 0);
1716
1717 out:
1718         dp->ldp_hash_end = cpu_to_le64(hash);
1719         if (last != NULL) {
1720                 if (last->lde_hash == dp->ldp_hash_end)
1721                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
1722                 last->lde_reclen = 0; /* end mark */
1723         }
1724         if (result > 0)
1725                 /* end of directory */
1726                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
1727         if (result < 0)
1728                 CWARN("build page failed: %d!\n", result);
1729         return result;
1730 }
1731
1732 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1733                  const struct lu_rdpg *rdpg)
1734 {
1735         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1736         int rc;
1737         ENTRY;
1738
1739         if (mdd_object_exists(mdd_obj) == 0) {
1740                 CERROR("%s: object "DFID" not found: rc = -2\n",
1741                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1742                 return -ENOENT;
1743         }
1744
1745         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
1746         rc = mdd_readpage_sanity_check(env, mdd_obj);
1747         if (rc)
1748                 GOTO(out_unlock, rc);
1749
1750         if (mdd_is_dead_obj(mdd_obj)) {
1751                 struct page *pg;
1752                 struct lu_dirpage *dp;
1753
1754                 /*
1755                  * According to POSIX, please do not return any entry to client:
1756                  * even dot and dotdot should not be returned.
1757                  */
1758                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
1759                        PFID(mdd_object_fid(mdd_obj)));
1760
1761                 if (rdpg->rp_count <= 0)
1762                         GOTO(out_unlock, rc = -EFAULT);
1763                 LASSERT(rdpg->rp_pages != NULL);
1764
1765                 pg = rdpg->rp_pages[0];
1766                 dp = (struct lu_dirpage*)cfs_kmap(pg);
1767                 memset(dp, 0 , sizeof(struct lu_dirpage));
1768                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1769                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
1770                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1771                 cfs_kunmap(pg);
1772                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
1773         }
1774
1775         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
1776                            mdd_dir_page_build, NULL);
1777         if (rc >= 0) {
1778                 struct lu_dirpage       *dp;
1779
1780                 dp = cfs_kmap(rdpg->rp_pages[0]);
1781                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1782                 if (rc == 0) {
1783                         /*
1784                          * No pages were processed, mark this for first page
1785                          * and send back.
1786                          */
1787                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1788                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
1789                 }
1790                 cfs_kunmap(rdpg->rp_pages[0]);
1791         }
1792
1793         GOTO(out_unlock, rc);
1794 out_unlock:
1795         mdd_read_unlock(env, mdd_obj);
1796         return rc;
1797 }
1798
1799 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
1800 {
1801         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1802
1803         if (mdd_object_exists(mdd_obj) == 0) {
1804                 CERROR("%s: object "DFID" not found: rc = -2\n",
1805                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1806                 return -ENOENT;
1807         }
1808         return dt_object_sync(env, mdd_object_child(mdd_obj));
1809 }
1810
1811 const struct md_object_operations mdd_obj_ops = {
1812         .moo_permission    = mdd_permission,
1813         .moo_attr_get      = mdd_attr_get,
1814         .moo_attr_set      = mdd_attr_set,
1815         .moo_xattr_get     = mdd_xattr_get,
1816         .moo_xattr_set     = mdd_xattr_set,
1817         .moo_xattr_list    = mdd_xattr_list,
1818         .moo_xattr_del     = mdd_xattr_del,
1819         .moo_open          = mdd_open,
1820         .moo_close         = mdd_close,
1821         .moo_readpage      = mdd_readpage,
1822         .moo_readlink      = mdd_readlink,
1823         .moo_changelog     = mdd_changelog,
1824         .moo_capa_get      = mdd_capa_get,
1825         .moo_object_sync   = mdd_object_sync,
1826         .moo_path          = mdd_path,
1827 };