Whamcloud - gitweb
LU-2034 changelog: redo changelog using OSD llog
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct lu_object *mdd_object_alloc(const struct lu_env *env,
184                                    const struct lu_object_header *hdr,
185                                    struct lu_device *d)
186 {
187         struct mdd_object *mdd_obj;
188
189         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
190         if (mdd_obj != NULL) {
191                 struct lu_object *o;
192
193                 o = mdd2lu_obj(mdd_obj);
194                 lu_object_init(o, NULL, d);
195                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
196                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
197                 mdd_obj->mod_count = 0;
198                 o->lo_ops = &mdd_lu_obj_ops;
199                 return o;
200         } else {
201                 return NULL;
202         }
203 }
204
205 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
206                            const struct lu_object_conf *unused)
207 {
208         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
209         struct mdd_object *mdd_obj = lu2mdd_obj(o);
210         struct lu_object  *below;
211         struct lu_device  *under;
212         ENTRY;
213
214         mdd_obj->mod_cltime = 0;
215         under = &d->mdd_child->dd_lu_dev;
216         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
217         mdd_pdlock_init(mdd_obj);
218         if (below == NULL)
219                 RETURN(-ENOMEM);
220
221         lu_object_add(o, below);
222
223         RETURN(0);
224 }
225
226 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
227 {
228         if (lu_object_exists(o))
229                 return mdd_get_flags(env, lu2mdd_obj(o));
230         else
231                 return 0;
232 }
233
234 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
235 {
236         struct mdd_object *mdd = lu2mdd_obj(o);
237
238         lu_object_fini(o);
239         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
240 }
241
242 static int mdd_object_print(const struct lu_env *env, void *cookie,
243                             lu_printer_t p, const struct lu_object *o)
244 {
245         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
246         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
247                     "valid=%x, cltime="LPU64", flags=%lx)",
248                     mdd, mdd->mod_count, mdd->mod_valid,
249                     mdd->mod_cltime, mdd->mod_flags);
250 }
251
252 static const struct lu_object_operations mdd_lu_obj_ops = {
253         .loo_object_init    = mdd_object_init,
254         .loo_object_start   = mdd_object_start,
255         .loo_object_free    = mdd_object_free,
256         .loo_object_print   = mdd_object_print,
257 };
258
259 struct mdd_object *mdd_object_find(const struct lu_env *env,
260                                    struct mdd_device *d,
261                                    const struct lu_fid *f)
262 {
263         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
264 }
265
266 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
267                         const char *path, struct lu_fid *fid)
268 {
269         struct lu_buf *buf;
270         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
271         struct mdd_object *obj;
272         struct lu_name *lname = &mdd_env_info(env)->mti_name;
273         char *name;
274         int rc = 0;
275         ENTRY;
276
277         /* temp buffer for path element */
278         buf = mdd_buf_alloc(env, PATH_MAX);
279         if (buf->lb_buf == NULL)
280                 RETURN(-ENOMEM);
281
282         lname->ln_name = name = buf->lb_buf;
283         lname->ln_namelen = 0;
284         *f = mdd->mdd_root_fid;
285
286         while(1) {
287                 while (*path == '/')
288                         path++;
289                 if (*path == '\0')
290                         break;
291                 while (*path != '/' && *path != '\0') {
292                         *name = *path;
293                         path++;
294                         name++;
295                         lname->ln_namelen++;
296                 }
297
298                 *name = '\0';
299                 /* find obj corresponding to fid */
300                 obj = mdd_object_find(env, mdd, f);
301                 if (obj == NULL)
302                         GOTO(out, rc = -EREMOTE);
303                 if (IS_ERR(obj))
304                         GOTO(out, rc = PTR_ERR(obj));
305                 /* get child fid from parent and name */
306                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
307                 mdd_object_put(env, obj);
308                 if (rc)
309                         break;
310
311                 name = buf->lb_buf;
312                 lname->ln_namelen = 0;
313         }
314
315         if (!rc)
316                 *fid = *f;
317 out:
318         RETURN(rc);
319 }
320
321 /** The maximum depth that fid2path() will search.
322  * This is limited only because we want to store the fids for
323  * historical path lookup purposes.
324  */
325 #define MAX_PATH_DEPTH 100
326
327 /** mdd_path() lookup structure. */
328 struct path_lookup_info {
329         __u64                pli_recno;        /**< history point */
330         __u64                pli_currec;       /**< current record */
331         struct lu_fid        pli_fid;
332         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
333         struct mdd_object   *pli_mdd_obj;
334         char                *pli_path;         /**< full path */
335         int                  pli_pathlen;
336         int                  pli_linkno;       /**< which hardlink to follow */
337         int                  pli_fidcount;     /**< number of \a pli_fids */
338 };
339
340 static int mdd_path_current(const struct lu_env *env,
341                             struct path_lookup_info *pli)
342 {
343         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
344         struct mdd_object *mdd_obj;
345         struct lu_buf     *buf = NULL;
346         struct link_ea_header *leh;
347         struct link_ea_entry  *lee;
348         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
349         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
350         char *ptr;
351         int reclen;
352         int rc;
353         ENTRY;
354
355         ptr = pli->pli_path + pli->pli_pathlen - 1;
356         *ptr = 0;
357         --ptr;
358         pli->pli_fidcount = 0;
359         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
360
361         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
362                 mdd_obj = mdd_object_find(env, mdd,
363                                           &pli->pli_fids[pli->pli_fidcount]);
364                 if (mdd_obj == NULL)
365                         GOTO(out, rc = -EREMOTE);
366                 if (IS_ERR(mdd_obj))
367                         GOTO(out, rc = PTR_ERR(mdd_obj));
368                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
369                 if (rc <= 0) {
370                         mdd_object_put(env, mdd_obj);
371                         if (rc == -1)
372                                 rc = -EREMOTE;
373                         else if (rc == 0)
374                                 /* Do I need to error out here? */
375                                 rc = -ENOENT;
376                         GOTO(out, rc);
377                 }
378
379                 /* Get parent fid and object name */
380                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
381                 buf = mdd_links_get(env, mdd_obj);
382                 mdd_read_unlock(env, mdd_obj);
383                 mdd_object_put(env, mdd_obj);
384                 if (IS_ERR(buf))
385                         GOTO(out, rc = PTR_ERR(buf));
386
387                 leh = buf->lb_buf;
388                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
389                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
390
391                 /* If set, use link #linkno for path lookup, otherwise use
392                    link #0.  Only do this for the final path element. */
393                 if ((pli->pli_fidcount == 0) &&
394                     (pli->pli_linkno < leh->leh_reccount)) {
395                         int count;
396                         for (count = 0; count < pli->pli_linkno; count++) {
397                                 lee = (struct link_ea_entry *)
398                                      ((char *)lee + reclen);
399                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
400                         }
401                         if (pli->pli_linkno < leh->leh_reccount - 1)
402                                 /* indicate to user there are more links */
403                                 pli->pli_linkno++;
404                 }
405
406                 /* Pack the name in the end of the buffer */
407                 ptr -= tmpname->ln_namelen;
408                 if (ptr - 1 <= pli->pli_path)
409                         GOTO(out, rc = -EOVERFLOW);
410                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
411                 *(--ptr) = '/';
412
413                 /* Store the parent fid for historic lookup */
414                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
415                         GOTO(out, rc = -EOVERFLOW);
416                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
417         }
418
419         /* Verify that our path hasn't changed since we started the lookup.
420            Record the current index, and verify the path resolves to the
421            same fid. If it does, then the path is correct as of this index. */
422         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
423         pli->pli_currec = mdd->mdd_cl.mc_index;
424         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
425         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
426         if (rc) {
427                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
428                 GOTO (out, rc = -EAGAIN);
429         }
430         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
431                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
432                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
433                        PFID(&pli->pli_fid));
434                 GOTO(out, rc = -EAGAIN);
435         }
436         ptr++; /* skip leading / */
437         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
438
439         EXIT;
440 out:
441         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
442                 /* if we vmalloced a large buffer drop it */
443                 mdd_buf_put(buf);
444
445         return rc;
446 }
447
448 static int mdd_path_historic(const struct lu_env *env,
449                              struct path_lookup_info *pli)
450 {
451         return 0;
452 }
453
454 /* Returns the full path to this fid, as of changelog record recno. */
455 static int mdd_path(const struct lu_env *env, struct md_object *obj,
456                     char *path, int pathlen, __u64 *recno, int *linkno)
457 {
458         struct path_lookup_info *pli;
459         int tries = 3;
460         int rc = -EAGAIN;
461         ENTRY;
462
463         if (pathlen < 3)
464                 RETURN(-EOVERFLOW);
465
466         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
467                 path[0] = '\0';
468                 RETURN(0);
469         }
470
471         OBD_ALLOC_PTR(pli);
472         if (pli == NULL)
473                 RETURN(-ENOMEM);
474
475         pli->pli_mdd_obj = md2mdd_obj(obj);
476         pli->pli_recno = *recno;
477         pli->pli_path = path;
478         pli->pli_pathlen = pathlen;
479         pli->pli_linkno = *linkno;
480
481         /* Retry multiple times in case file is being moved */
482         while (tries-- && rc == -EAGAIN)
483                 rc = mdd_path_current(env, pli);
484
485         /* For historical path lookup, the current links may not have existed
486          * at "recno" time.  We must switch over to earlier links/parents
487          * by using the changelog records.  If the earlier parent doesn't
488          * exist, we must search back through the changelog to reconstruct
489          * its parents, then check if it exists, etc.
490          * We may ignore this problem for the initial implementation and
491          * state that an "original" hardlink must still exist for us to find
492          * historic path name. */
493         if (pli->pli_recno != -1) {
494                 rc = mdd_path_historic(env, pli);
495         } else {
496                 *recno = pli->pli_currec;
497                 /* Return next link index to caller */
498                 *linkno = pli->pli_linkno;
499         }
500
501         OBD_FREE_PTR(pli);
502
503         RETURN (rc);
504 }
505
506 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
507 {
508         struct lu_attr *la = &mdd_env_info(env)->mti_la;
509         int rc;
510
511         ENTRY;
512         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
513         if (rc == 0) {
514                 mdd_flags_xlate(obj, la->la_flags);
515         }
516         RETURN(rc);
517 }
518
519 /*
520  * No permission check is needed.
521  */
522 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
523                  struct md_attr *ma)
524 {
525         int rc;
526         ENTRY;
527
528         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
529                           mdd_object_capa(env, md2mdd_obj(obj)));
530         RETURN(rc);
531 }
532
533 /*
534  * No permission check is needed.
535  */
536 static int mdd_xattr_get(const struct lu_env *env,
537                          struct md_object *obj, struct lu_buf *buf,
538                          const char *name)
539 {
540         struct mdd_object *mdd_obj = md2mdd_obj(obj);
541         int rc;
542
543         ENTRY;
544
545         if (mdd_object_exists(mdd_obj) == 0) {
546                 CERROR("%s: object "DFID" not found: rc = -2\n",
547                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
548                 return -ENOENT;
549         }
550
551         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
552         rc = mdo_xattr_get(env, mdd_obj, buf, name,
553                            mdd_object_capa(env, mdd_obj));
554         mdd_read_unlock(env, mdd_obj);
555
556         RETURN(rc);
557 }
558
559 /*
560  * Permission check is done when open,
561  * no need check again.
562  */
563 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
564                         struct lu_buf *buf)
565 {
566         struct mdd_object *mdd_obj = md2mdd_obj(obj);
567         struct dt_object  *next;
568         loff_t             pos = 0;
569         int                rc;
570         ENTRY;
571
572         if (mdd_object_exists(mdd_obj) == 0) {
573                 CERROR("%s: object "DFID" not found: rc = -2\n",
574                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
575                 return -ENOENT;
576         }
577
578         next = mdd_object_child(mdd_obj);
579         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
580         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
581                                          mdd_object_capa(env, mdd_obj));
582         mdd_read_unlock(env, mdd_obj);
583         RETURN(rc);
584 }
585
586 /*
587  * No permission check is needed.
588  */
589 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
590                           struct lu_buf *buf)
591 {
592         struct mdd_object *mdd_obj = md2mdd_obj(obj);
593         int rc;
594
595         ENTRY;
596
597         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
598         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
599         mdd_read_unlock(env, mdd_obj);
600
601         RETURN(rc);
602 }
603
604 int mdd_declare_object_create_internal(const struct lu_env *env,
605                                        struct mdd_object *p,
606                                        struct mdd_object *c,
607                                        struct lu_attr *attr,
608                                        struct thandle *handle,
609                                        const struct md_op_spec *spec)
610 {
611         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
612         const struct dt_index_features *feat = spec->sp_feat;
613         int rc;
614         ENTRY;
615
616         if (feat != &dt_directory_features && feat != NULL) {
617                 dof->dof_type = DFT_INDEX;
618                 dof->u.dof_idx.di_feat = feat;
619
620         } else {
621                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
622                 if (dof->dof_type == DFT_REGULAR) {
623                         dof->u.dof_reg.striped =
624                                 md_should_create(spec->sp_cr_flags);
625                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
626                                 dof->u.dof_reg.striped = 0;
627                         /* is this replay? */
628                         if (spec->no_create)
629                                 dof->u.dof_reg.striped = 0;
630                 }
631         }
632
633         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
634
635         RETURN(rc);
636 }
637
638 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
639                                struct mdd_object *c, struct lu_attr *attr,
640                                struct thandle *handle,
641                                const struct md_op_spec *spec)
642 {
643         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
644         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
645         int rc;
646         ENTRY;
647
648         LASSERT(!mdd_object_exists(c));
649
650         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
651
652         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
653
654         RETURN(rc);
655 }
656
657 /**
658  * Make sure the ctime is increased only.
659  */
660 static inline int mdd_attr_check(const struct lu_env *env,
661                                  struct mdd_object *obj,
662                                  struct lu_attr *attr)
663 {
664         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
665         int rc;
666         ENTRY;
667
668         if (attr->la_valid & LA_CTIME) {
669                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
670                 if (rc)
671                         RETURN(rc);
672
673                 if (attr->la_ctime < tmp_la->la_ctime)
674                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
675                 else if (attr->la_valid == LA_CTIME &&
676                          attr->la_ctime == tmp_la->la_ctime)
677                         attr->la_valid &= ~LA_CTIME;
678         }
679         RETURN(0);
680 }
681
682 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
683                           struct lu_attr *attr, struct thandle *handle,
684                           int needacl)
685 {
686         int rc;
687         ENTRY;
688
689         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
690 #ifdef CONFIG_FS_POSIX_ACL
691         if (!rc && (attr->la_valid & LA_MODE) && needacl)
692                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
693 #endif
694         RETURN(rc);
695 }
696
697 int mdd_attr_check_set_internal(const struct lu_env *env,
698                                 struct mdd_object *obj, struct lu_attr *attr,
699                                 struct thandle *handle, int needacl)
700 {
701         int rc;
702         ENTRY;
703
704         rc = mdd_attr_check(env, obj, attr);
705         if (rc)
706                 RETURN(rc);
707
708         if (attr->la_valid)
709                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
710         RETURN(rc);
711 }
712
713 /*
714  * This gives the same functionality as the code between
715  * sys_chmod and inode_setattr
716  * chown_common and inode_setattr
717  * utimes and inode_setattr
718  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
719  */
720 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
721                         struct lu_attr *la, const unsigned long flags)
722 {
723         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
724         struct md_ucred  *uc;
725         int               rc;
726         ENTRY;
727
728         if (!la->la_valid)
729                 RETURN(0);
730
731         /* Do not permit change file type */
732         if (la->la_valid & LA_TYPE)
733                 RETURN(-EPERM);
734
735         /* They should not be processed by setattr */
736         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
737                 RETURN(-EPERM);
738
739         /* export destroy does not have ->le_ses, but we may want
740          * to drop LUSTRE_SOM_FL. */
741         if (!env->le_ses)
742                 RETURN(0);
743
744         uc = md_ucred(env);
745
746         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
747         if (rc)
748                 RETURN(rc);
749
750         if (la->la_valid == LA_CTIME) {
751                 if (!(flags & MDS_PERM_BYPASS))
752                         /* This is only for set ctime when rename's source is
753                          * on remote MDS. */
754                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
755                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
756                         la->la_valid &= ~LA_CTIME;
757                 RETURN(rc);
758         }
759
760         if (la->la_valid == LA_ATIME) {
761                 /* This is atime only set for read atime update on close. */
762                 if (la->la_atime >= tmp_la->la_atime &&
763                     la->la_atime < (tmp_la->la_atime +
764                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
765                         la->la_valid &= ~LA_ATIME;
766                 RETURN(0);
767         }
768
769         /* Check if flags change. */
770         if (la->la_valid & LA_FLAGS) {
771                 unsigned int oldflags = 0;
772                 unsigned int newflags = la->la_flags &
773                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
774
775                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
776                     !mdd_capable(uc, CFS_CAP_FOWNER))
777                         RETURN(-EPERM);
778
779                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
780                  * only be changed by the relevant capability. */
781                 if (mdd_is_immutable(obj))
782                         oldflags |= LUSTRE_IMMUTABLE_FL;
783                 if (mdd_is_append(obj))
784                         oldflags |= LUSTRE_APPEND_FL;
785                 if ((oldflags ^ newflags) &&
786                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
787                         RETURN(-EPERM);
788
789                 if (!S_ISDIR(tmp_la->la_mode))
790                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
791         }
792
793         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
794             (la->la_valid & ~LA_FLAGS) &&
795             !(flags & MDS_PERM_BYPASS))
796                 RETURN(-EPERM);
797
798         /* Check for setting the obj time. */
799         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
800             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
801                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
802                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
803                         rc = mdd_permission_internal(env, obj, tmp_la,
804                                                      MAY_WRITE);
805                         if (rc)
806                                 RETURN(rc);
807                 }
808         }
809
810         if (la->la_valid & LA_KILL_SUID) {
811                 la->la_valid &= ~LA_KILL_SUID;
812                 if ((tmp_la->la_mode & S_ISUID) &&
813                     !(la->la_valid & LA_MODE)) {
814                         la->la_mode = tmp_la->la_mode;
815                         la->la_valid |= LA_MODE;
816                 }
817                 la->la_mode &= ~S_ISUID;
818         }
819
820         if (la->la_valid & LA_KILL_SGID) {
821                 la->la_valid &= ~LA_KILL_SGID;
822                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
823                                         (S_ISGID | S_IXGRP)) &&
824                     !(la->la_valid & LA_MODE)) {
825                         la->la_mode = tmp_la->la_mode;
826                         la->la_valid |= LA_MODE;
827                 }
828                 la->la_mode &= ~S_ISGID;
829         }
830
831         /* Make sure a caller can chmod. */
832         if (la->la_valid & LA_MODE) {
833                 if (!(flags & MDS_PERM_BYPASS) &&
834                     (uc->mu_fsuid != tmp_la->la_uid) &&
835                     !mdd_capable(uc, CFS_CAP_FOWNER))
836                         RETURN(-EPERM);
837
838                 if (la->la_mode == (cfs_umode_t) -1)
839                         la->la_mode = tmp_la->la_mode;
840                 else
841                         la->la_mode = (la->la_mode & S_IALLUGO) |
842                                       (tmp_la->la_mode & ~S_IALLUGO);
843
844                 /* Also check the setgid bit! */
845                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
846                                        la->la_gid : tmp_la->la_gid) &&
847                     !mdd_capable(uc, CFS_CAP_FSETID))
848                         la->la_mode &= ~S_ISGID;
849         } else {
850                la->la_mode = tmp_la->la_mode;
851         }
852
853         /* Make sure a caller can chown. */
854         if (la->la_valid & LA_UID) {
855                 if (la->la_uid == (uid_t) -1)
856                         la->la_uid = tmp_la->la_uid;
857                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
858                     (la->la_uid != tmp_la->la_uid)) &&
859                     !mdd_capable(uc, CFS_CAP_CHOWN))
860                         RETURN(-EPERM);
861
862                 /* If the user or group of a non-directory has been
863                  * changed by a non-root user, remove the setuid bit.
864                  * 19981026 David C Niemi <niemi@tux.org>
865                  *
866                  * Changed this to apply to all users, including root,
867                  * to avoid some races. This is the behavior we had in
868                  * 2.0. The check for non-root was definitely wrong
869                  * for 2.2 anyway, as it should have been using
870                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
871                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
872                     !S_ISDIR(tmp_la->la_mode)) {
873                         la->la_mode &= ~S_ISUID;
874                         la->la_valid |= LA_MODE;
875                 }
876         }
877
878         /* Make sure caller can chgrp. */
879         if (la->la_valid & LA_GID) {
880                 if (la->la_gid == (gid_t) -1)
881                         la->la_gid = tmp_la->la_gid;
882                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
883                     ((la->la_gid != tmp_la->la_gid) &&
884                     !lustre_in_group_p(uc, la->la_gid))) &&
885                     !mdd_capable(uc, CFS_CAP_CHOWN))
886                         RETURN(-EPERM);
887
888                 /* Likewise, if the user or group of a non-directory
889                  * has been changed by a non-root user, remove the
890                  * setgid bit UNLESS there is no group execute bit
891                  * (this would be a file marked for mandatory
892                  * locking).  19981026 David C Niemi <niemi@tux.org>
893                  *
894                  * Removed the fsuid check (see the comment above) --
895                  * 19990830 SD. */
896                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
897                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
898                         la->la_mode &= ~S_ISGID;
899                         la->la_valid |= LA_MODE;
900                 }
901         }
902
903         /* For both Size-on-MDS case and truncate case,
904          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
905          * We distinguish them by "flags & MDS_SOM".
906          * For SOM case, it is true, the MAY_WRITE perm has been checked
907          * when open, no need check again. For truncate case, it is false,
908          * the MAY_WRITE perm should be checked here. */
909         if (flags & MDS_SOM) {
910                 /* For the "Size-on-MDS" setattr update, merge coming
911                  * attributes with the set in the inode. BUG 10641 */
912                 if ((la->la_valid & LA_ATIME) &&
913                     (la->la_atime <= tmp_la->la_atime))
914                         la->la_valid &= ~LA_ATIME;
915
916                 /* OST attributes do not have a priority over MDS attributes,
917                  * so drop times if ctime is equal. */
918                 if ((la->la_valid & LA_CTIME) &&
919                     (la->la_ctime <= tmp_la->la_ctime))
920                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
921         } else {
922                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
923                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
924                               (uc->mu_fsuid == tmp_la->la_uid)) &&
925                             !(flags & MDS_PERM_BYPASS)) {
926                                 rc = mdd_permission_internal(env, obj,
927                                                              tmp_la, MAY_WRITE);
928                                 if (rc)
929                                         RETURN(rc);
930                         }
931                 }
932                 if (la->la_valid & LA_CTIME) {
933                         /* The pure setattr, it has the priority over what is
934                          * already set, do not drop it if ctime is equal. */
935                         if (la->la_ctime < tmp_la->la_ctime)
936                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
937                                                   LA_CTIME);
938                 }
939         }
940
941         RETURN(0);
942 }
943
944 /** Store a data change changelog record
945  * If this fails, we must fail the whole transaction; we don't
946  * want the change to commit without the log entry.
947  * \param mdd_obj - mdd_object of change
948  * \param handle - transacion handle
949  */
950 static int mdd_changelog_data_store(const struct lu_env *env,
951                                     struct mdd_device *mdd,
952                                     enum changelog_rec_type type,
953                                     int flags, struct mdd_object *mdd_obj,
954                                     struct thandle *handle)
955 {
956         const struct lu_fid             *tfid = mdo2fid(mdd_obj);
957         struct llog_changelog_rec       *rec;
958         struct lu_buf                   *buf;
959         int                              reclen;
960         int                              rc;
961
962         /* Not recording */
963         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
964                 RETURN(0);
965         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
966                 RETURN(0);
967
968         LASSERT(mdd_obj != NULL);
969         LASSERT(handle != NULL);
970
971         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
972             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
973                 /* Don't need multiple updates in this log */
974                 /* Don't check under lock - no big deal if we get an extra
975                    entry */
976                 RETURN(0);
977         }
978
979         reclen = llog_data_len(sizeof(*rec));
980         buf = mdd_buf_alloc(env, reclen);
981         if (buf->lb_buf == NULL)
982                 RETURN(-ENOMEM);
983         rec = buf->lb_buf;
984
985         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
986         rec->cr.cr_type = (__u32)type;
987         rec->cr.cr_tfid = *tfid;
988         rec->cr.cr_namelen = 0;
989         mdd_obj->mod_cltime = cfs_time_current_64();
990
991         rc = mdd_changelog_store(env, mdd, rec, handle);
992
993         RETURN(rc);
994 }
995
996 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
997                   int flags, struct md_object *obj)
998 {
999         struct thandle *handle;
1000         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1001         struct mdd_device *mdd = mdo2mdd(obj);
1002         int rc;
1003         ENTRY;
1004
1005         handle = mdd_trans_create(env, mdd);
1006         if (IS_ERR(handle))
1007                 RETURN(PTR_ERR(handle));
1008
1009         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1010         if (rc)
1011                 GOTO(stop, rc);
1012
1013         rc = mdd_trans_start(env, mdd, handle);
1014         if (rc)
1015                 GOTO(stop, rc);
1016
1017         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1018                                       handle);
1019
1020 stop:
1021         mdd_trans_stop(env, mdd, rc, handle);
1022
1023         RETURN(rc);
1024 }
1025
1026 /**
1027  * Save LMA extended attributes with data from \a ma.
1028  *
1029  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1030  * not, LMA EA will be first read from disk, modified and write back.
1031  *
1032  */
1033 /* Precedence for choosing record type when multiple
1034  * attributes change: setattr > mtime > ctime > atime
1035  * (ctime changes when mtime does, plus chmod/chown.
1036  * atime and ctime are independent.) */
1037 static int mdd_attr_set_changelog(const struct lu_env *env,
1038                                   struct md_object *obj, struct thandle *handle,
1039                                   __u64 valid)
1040 {
1041         struct mdd_device *mdd = mdo2mdd(obj);
1042         int bits, type = 0;
1043
1044         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1045         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1046         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1047         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1048         bits = bits & mdd->mdd_cl.mc_mask;
1049         if (bits == 0)
1050                 return 0;
1051
1052         /* The record type is the lowest non-masked set bit */
1053         while (bits && ((bits & 1) == 0)) {
1054                 bits = bits >> 1;
1055                 type++;
1056         }
1057
1058         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1059         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1060                                         md2mdd_obj(obj), handle);
1061 }
1062
1063 static int mdd_declare_attr_set(const struct lu_env *env,
1064                                 struct mdd_device *mdd,
1065                                 struct mdd_object *obj,
1066                                 const struct lu_attr *attr,
1067                                 struct thandle *handle)
1068 {
1069         int rc;
1070
1071         rc = mdo_declare_attr_set(env, obj, attr, handle);
1072         if (rc)
1073                 return rc;
1074
1075 #ifdef CONFIG_FS_POSIX_ACL
1076         if (attr->la_valid & LA_MODE) {
1077                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1078                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1079                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1080                 mdd_read_unlock(env, obj);
1081                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1082                         rc = 0;
1083                 else if (rc < 0)
1084                         return rc;
1085
1086                 if (rc != 0) {
1087                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1088                         rc = mdo_declare_xattr_set(env, obj, buf,
1089                                                    XATTR_NAME_ACL_ACCESS, 0,
1090                                                    handle);
1091                         if (rc)
1092                                 return rc;
1093                 }
1094         }
1095 #endif
1096
1097         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1098         return rc;
1099 }
1100
1101 /* set attr and LOV EA at once, return updated attr */
1102 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1103                  const struct md_attr *ma)
1104 {
1105         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1106         struct mdd_device *mdd = mdo2mdd(obj);
1107         struct thandle *handle;
1108         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1109         const struct lu_attr *la = &ma->ma_attr;
1110         int rc;
1111         ENTRY;
1112
1113         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1114         LASSERT((ma->ma_valid & MA_LOV) == 0);
1115         LASSERT((ma->ma_valid & MA_HSM) == 0);
1116         LASSERT((ma->ma_valid & MA_SOM) == 0);
1117
1118         *la_copy = ma->ma_attr;
1119         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1120         if (rc)
1121                 RETURN(rc);
1122
1123         /* setattr on "close" only change atime, or do nothing */
1124         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1125                 RETURN(0);
1126
1127         handle = mdd_trans_create(env, mdd);
1128         if (IS_ERR(handle))
1129                 RETURN(PTR_ERR(handle));
1130
1131         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1132         if (rc)
1133                 GOTO(stop, rc);
1134
1135         rc = mdd_trans_start(env, mdd, handle);
1136         if (rc)
1137                 GOTO(stop, rc);
1138
1139         /* permission changes may require sync operation */
1140         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1141                 handle->th_sync |= !!mdd->mdd_sync_permission;
1142
1143         if (la->la_valid & (LA_MTIME | LA_CTIME))
1144                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1145                        la->la_mtime, la->la_ctime);
1146
1147         if (la_copy->la_valid & LA_FLAGS) {
1148                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1149                 if (rc == 0)
1150                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1151         } else if (la_copy->la_valid) {            /* setattr */
1152                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1153         }
1154
1155         if (rc == 0)
1156                 rc = mdd_attr_set_changelog(env, obj, handle,
1157                                             la->la_valid);
1158 stop:
1159         mdd_trans_stop(env, mdd, rc, handle);
1160         RETURN(rc);
1161 }
1162
1163 static int mdd_xattr_sanity_check(const struct lu_env *env,
1164                                   struct mdd_object *obj)
1165 {
1166         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1167         struct md_ucred *uc     = md_ucred(env);
1168         int rc;
1169         ENTRY;
1170
1171         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1172                 RETURN(-EPERM);
1173
1174         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1175         if (rc)
1176                 RETURN(rc);
1177
1178         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1179             !mdd_capable(uc, CFS_CAP_FOWNER))
1180                 RETURN(-EPERM);
1181
1182         RETURN(rc);
1183 }
1184
1185 static int mdd_declare_xattr_set(const struct lu_env *env,
1186                                  struct mdd_device *mdd,
1187                                  struct mdd_object *obj,
1188                                  const struct lu_buf *buf,
1189                                  const char *name,
1190                                  struct thandle *handle)
1191 {
1192         int rc;
1193
1194         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1195         if (rc)
1196                 return rc;
1197
1198         /* Only record user xattr changes */
1199         if ((strncmp("user.", name, 5) == 0))
1200                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1201
1202         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1203         return rc;
1204 }
1205
1206 /**
1207  * The caller should guarantee to update the object ctime
1208  * after xattr_set if needed.
1209  */
1210 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1211                          const struct lu_buf *buf, const char *name,
1212                          int fl)
1213 {
1214         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1215         struct mdd_device *mdd = mdo2mdd(obj);
1216         struct thandle *handle;
1217         int  rc;
1218         ENTRY;
1219
1220         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1221                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1222                 RETURN(rc);
1223         }
1224
1225         rc = mdd_xattr_sanity_check(env, mdd_obj);
1226         if (rc)
1227                 RETURN(rc);
1228
1229         handle = mdd_trans_create(env, mdd);
1230         if (IS_ERR(handle))
1231                 RETURN(PTR_ERR(handle));
1232
1233         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1234         if (rc)
1235                 GOTO(stop, rc);
1236
1237         rc = mdd_trans_start(env, mdd, handle);
1238         if (rc)
1239                 GOTO(stop, rc);
1240
1241         /* security-replated changes may require sync */
1242         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1243                 handle->th_sync |= !!mdd->mdd_sync_permission;
1244
1245         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1246         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1247                            mdd_object_capa(env, mdd_obj));
1248         mdd_write_unlock(env, mdd_obj);
1249         if (rc)
1250                 GOTO(stop, rc);
1251
1252         /* Only record system & user xattr changes */
1253         if (strncmp(XATTR_USER_PREFIX, name,
1254                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1255                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1256                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1257                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1258                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1259                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1260                                               handle);
1261
1262 stop:
1263         mdd_trans_stop(env, mdd, rc, handle);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int mdd_declare_xattr_del(const struct lu_env *env,
1269                                  struct mdd_device *mdd,
1270                                  struct mdd_object *obj,
1271                                  const char *name,
1272                                  struct thandle *handle)
1273 {
1274         int rc;
1275
1276         rc = mdo_declare_xattr_del(env, obj, name, handle);
1277         if (rc)
1278                 return rc;
1279
1280         /* Only record user xattr changes */
1281         if ((strncmp("user.", name, 5) == 0))
1282                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1283
1284         return rc;
1285 }
1286
1287 /**
1288  * The caller should guarantee to update the object ctime
1289  * after xattr_set if needed.
1290  */
1291 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1292                   const char *name)
1293 {
1294         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1295         struct mdd_device *mdd = mdo2mdd(obj);
1296         struct thandle *handle;
1297         int  rc;
1298         ENTRY;
1299
1300         rc = mdd_xattr_sanity_check(env, mdd_obj);
1301         if (rc)
1302                 RETURN(rc);
1303
1304         handle = mdd_trans_create(env, mdd);
1305         if (IS_ERR(handle))
1306                 RETURN(PTR_ERR(handle));
1307
1308         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1309         if (rc)
1310                 GOTO(stop, rc);
1311
1312         rc = mdd_trans_start(env, mdd, handle);
1313         if (rc)
1314                 GOTO(stop, rc);
1315
1316         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1317         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1318                            mdd_object_capa(env, mdd_obj));
1319         mdd_write_unlock(env, mdd_obj);
1320         if (rc)
1321                 GOTO(stop, rc);
1322
1323         /* Only record system & user xattr changes */
1324         if (strncmp(XATTR_USER_PREFIX, name,
1325                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1326                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1327                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1328                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1329                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1330                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1331                                               handle);
1332
1333 stop:
1334         mdd_trans_stop(env, mdd, rc, handle);
1335
1336         RETURN(rc);
1337 }
1338
1339 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1340                 struct mdd_object *child, struct lu_attr *attr)
1341 {
1342         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1343         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1344         struct dt_object *nc = mdd_object_child(child);
1345
1346         /* @hint will be initialized by underlying device. */
1347         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1348 }
1349
1350 /*
1351  * do NOT or the MAY_*'s, you'll get the weakest
1352  */
1353 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1354 {
1355         int res = 0;
1356
1357         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1358          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1359          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1360          * owner can write to a file even if it is marked readonly to hide
1361          * its brokenness. (bug 5781) */
1362         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1363                 struct md_ucred *uc = md_ucred(env);
1364
1365                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1366                     (la->la_uid == uc->mu_fsuid))
1367                         return 0;
1368         }
1369
1370         if (flags & FMODE_READ)
1371                 res |= MAY_READ;
1372         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1373                 res |= MAY_WRITE;
1374         if (flags & MDS_FMODE_EXEC)
1375                 res = MAY_EXEC;
1376         return res;
1377 }
1378
1379 static int mdd_open_sanity_check(const struct lu_env *env,
1380                                  struct mdd_object *obj, int flag)
1381 {
1382         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1383         int mode, rc;
1384         ENTRY;
1385
1386         /* EEXIST check */
1387         if (mdd_is_dead_obj(obj))
1388                 RETURN(-ENOENT);
1389
1390         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1391         if (rc)
1392                RETURN(rc);
1393
1394         if (S_ISLNK(tmp_la->la_mode))
1395                 RETURN(-ELOOP);
1396
1397         mode = accmode(env, tmp_la, flag);
1398
1399         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1400                 RETURN(-EISDIR);
1401
1402         if (!(flag & MDS_OPEN_CREATED)) {
1403                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1404                 if (rc)
1405                         RETURN(rc);
1406         }
1407
1408         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1409             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1410                 flag &= ~MDS_OPEN_TRUNC;
1411
1412         /* For writing append-only file must open it with append mode. */
1413         if (mdd_is_append(obj)) {
1414                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1415                         RETURN(-EPERM);
1416                 if (flag & MDS_OPEN_TRUNC)
1417                         RETURN(-EPERM);
1418         }
1419
1420 #if 0
1421         /*
1422          * Now, flag -- O_NOATIME does not be packed by client.
1423          */
1424         if (flag & O_NOATIME) {
1425                 struct md_ucred *uc = md_ucred(env);
1426
1427                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1428                     (uc->mu_valid == UCRED_NEW)) &&
1429                     (uc->mu_fsuid != tmp_la->la_uid) &&
1430                     !mdd_capable(uc, CFS_CAP_FOWNER))
1431                         RETURN(-EPERM);
1432         }
1433 #endif
1434
1435         RETURN(0);
1436 }
1437
1438 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1439                     int flags)
1440 {
1441         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1442         int rc = 0;
1443
1444         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1445
1446         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1447         if (rc == 0)
1448                 mdd_obj->mod_count++;
1449
1450         mdd_write_unlock(env, mdd_obj);
1451         return rc;
1452 }
1453
1454 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1455                             struct md_attr *ma, struct thandle *handle)
1456 {
1457         return mdo_declare_destroy(env, obj, handle);
1458 }
1459
1460 /* return md_attr back,
1461  * if it is last unlink then return lov ea + llog cookie*/
1462 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1463                     struct md_attr *ma, struct thandle *handle)
1464 {
1465         int rc;
1466         ENTRY;
1467
1468         rc = mdo_destroy(env, obj, handle);
1469
1470         RETURN(rc);
1471 }
1472
1473 static int mdd_declare_close(const struct lu_env *env,
1474                              struct mdd_object *obj,
1475                              struct md_attr *ma,
1476                              struct thandle *handle)
1477 {
1478         int rc;
1479
1480         rc = orph_declare_index_delete(env, obj, handle);
1481         if (rc)
1482                 return rc;
1483
1484         return mdo_declare_destroy(env, obj, handle);
1485 }
1486
1487 /*
1488  * No permission check is needed.
1489  */
1490 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1491                      struct md_attr *ma, int mode)
1492 {
1493         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1494         struct mdd_device *mdd = mdo2mdd(obj);
1495         struct thandle    *handle = NULL;
1496         int rc, is_orphan = 0;
1497         ENTRY;
1498
1499         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1500                 mdd_obj->mod_count--;
1501
1502                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1503                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1504                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1505                 RETURN(0);
1506         }
1507
1508         /* check without any lock */
1509         if (mdd_obj->mod_count == 1 &&
1510             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1511  again:
1512                 handle = mdd_trans_create(env, mdo2mdd(obj));
1513                 if (IS_ERR(handle))
1514                         RETURN(PTR_ERR(handle));
1515
1516                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1517                 if (rc)
1518                         GOTO(stop, rc);
1519
1520                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1521                 if (rc)
1522                         GOTO(stop, rc);
1523
1524                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1525                 if (rc)
1526                         GOTO(stop, rc);
1527         }
1528
1529         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1530         if (handle == NULL && mdd_obj->mod_count == 1 &&
1531             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1532                 mdd_write_unlock(env, mdd_obj);
1533                 goto again;
1534         }
1535
1536         /* release open count */
1537         mdd_obj->mod_count --;
1538
1539         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1540                 /* remove link to object from orphan index */
1541                 LASSERT(handle != NULL);
1542                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1543                 if (rc == 0) {
1544                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1545                                "list, OSS objects to be destroyed.\n",
1546                                PFID(mdd_object_fid(mdd_obj)));
1547                         is_orphan = 1;
1548                 } else {
1549                         CERROR("Object "DFID" can not be deleted from orphan "
1550                                 "list, maybe cause OST objects can not be "
1551                                 "destroyed (err: %d).\n",
1552                                 PFID(mdd_object_fid(mdd_obj)), rc);
1553                         /* If object was not deleted from orphan list, do not
1554                          * destroy OSS objects, which will be done when next
1555                          * recovery. */
1556                         GOTO(out, rc);
1557                 }
1558         }
1559
1560         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1561                         mdd_object_capa(env, mdd_obj));
1562         /* Object maybe not in orphan list originally, it is rare case for
1563          * mdd_finish_unlink() failure. */
1564         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
1565                 if (handle == NULL) {
1566                         handle = mdd_trans_create(env, mdo2mdd(obj));
1567                         if (IS_ERR(handle))
1568                                 GOTO(out, rc = PTR_ERR(handle));
1569
1570                         rc = mdo_declare_destroy(env, mdd_obj, handle);
1571                         if (rc)
1572                                 GOTO(out, rc);
1573
1574                         rc = mdd_declare_changelog_store(env, mdd,
1575                                         NULL, handle);
1576                         if (rc)
1577                                 GOTO(stop, rc);
1578
1579                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1580                         if (rc)
1581                                 GOTO(out, rc);
1582                 }
1583
1584                 rc = mdo_destroy(env, mdd_obj, handle);
1585
1586                 if (rc != 0)
1587                         CERROR("Error when prepare to delete Object "DFID" , "
1588                                "which will cause OST objects can not be "
1589                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1590         }
1591         EXIT;
1592
1593 out:
1594
1595         mdd_write_unlock(env, mdd_obj);
1596
1597         if (rc == 0 &&
1598             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1599             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1600                 if (handle == NULL) {
1601                         handle = mdd_trans_create(env, mdo2mdd(obj));
1602                         if (IS_ERR(handle))
1603                                 GOTO(stop, rc = IS_ERR(handle));
1604
1605                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1606                                                          handle);
1607                         if (rc)
1608                                 GOTO(stop, rc);
1609
1610                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1611                         if (rc)
1612                                 GOTO(stop, rc);
1613                 }
1614
1615                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1616                                          mdd_obj, handle);
1617         }
1618
1619 stop:
1620         if (handle != NULL)
1621                 mdd_trans_stop(env, mdd, rc, handle);
1622         return rc;
1623 }
1624
1625 /*
1626  * Permission check is done when open,
1627  * no need check again.
1628  */
1629 static int mdd_readpage_sanity_check(const struct lu_env *env,
1630                                      struct mdd_object *obj)
1631 {
1632         struct dt_object *next = mdd_object_child(obj);
1633         int rc;
1634         ENTRY;
1635
1636         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1637                 rc = 0;
1638         else
1639                 rc = -ENOTDIR;
1640
1641         RETURN(rc);
1642 }
1643
1644 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
1645                               int nob, const struct dt_it_ops *iops,
1646                               struct dt_it *it, __u32 attr, void *arg)
1647 {
1648         struct lu_dirpage       *dp = &lp->lp_dir;
1649         void                    *area = dp;
1650         int                      result;
1651         __u64                    hash = 0;
1652         struct lu_dirent        *ent;
1653         struct lu_dirent        *last = NULL;
1654         int                      first = 1;
1655
1656         memset(area, 0, sizeof (*dp));
1657         area += sizeof (*dp);
1658         nob  -= sizeof (*dp);
1659
1660         ent  = area;
1661         do {
1662                 int    len;
1663                 int    recsize;
1664
1665                 len = iops->key_size(env, it);
1666
1667                 /* IAM iterator can return record with zero len. */
1668                 if (len == 0)
1669                         goto next;
1670
1671                 hash = iops->store(env, it);
1672                 if (unlikely(first)) {
1673                         first = 0;
1674                         dp->ldp_hash_start = cpu_to_le64(hash);
1675                 }
1676
1677                 /* calculate max space required for lu_dirent */
1678                 recsize = lu_dirent_calc_size(len, attr);
1679
1680                 if (nob >= recsize) {
1681                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
1682                         if (result == -ESTALE)
1683                                 goto next;
1684                         if (result != 0)
1685                                 goto out;
1686
1687                         /* osd might not able to pack all attributes,
1688                          * so recheck rec length */
1689                         recsize = le16_to_cpu(ent->lde_reclen);
1690                 } else {
1691                         result = (last != NULL) ? 0 :-EINVAL;
1692                         goto out;
1693                 }
1694                 last = ent;
1695                 ent = (void *)ent + recsize;
1696                 nob -= recsize;
1697
1698 next:
1699                 result = iops->next(env, it);
1700                 if (result == -ESTALE)
1701                         goto next;
1702         } while (result == 0);
1703
1704 out:
1705         dp->ldp_hash_end = cpu_to_le64(hash);
1706         if (last != NULL) {
1707                 if (last->lde_hash == dp->ldp_hash_end)
1708                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
1709                 last->lde_reclen = 0; /* end mark */
1710         }
1711         if (result > 0)
1712                 /* end of directory */
1713                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
1714         if (result < 0)
1715                 CWARN("build page failed: %d!\n", result);
1716         return result;
1717 }
1718
1719 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1720                  const struct lu_rdpg *rdpg)
1721 {
1722         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1723         int rc;
1724         ENTRY;
1725
1726         if (mdd_object_exists(mdd_obj) == 0) {
1727                 CERROR("%s: object "DFID" not found: rc = -2\n",
1728                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1729                 return -ENOENT;
1730         }
1731
1732         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
1733         rc = mdd_readpage_sanity_check(env, mdd_obj);
1734         if (rc)
1735                 GOTO(out_unlock, rc);
1736
1737         if (mdd_is_dead_obj(mdd_obj)) {
1738                 struct page *pg;
1739                 struct lu_dirpage *dp;
1740
1741                 /*
1742                  * According to POSIX, please do not return any entry to client:
1743                  * even dot and dotdot should not be returned.
1744                  */
1745                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
1746                        PFID(mdd_object_fid(mdd_obj)));
1747
1748                 if (rdpg->rp_count <= 0)
1749                         GOTO(out_unlock, rc = -EFAULT);
1750                 LASSERT(rdpg->rp_pages != NULL);
1751
1752                 pg = rdpg->rp_pages[0];
1753                 dp = (struct lu_dirpage*)cfs_kmap(pg);
1754                 memset(dp, 0 , sizeof(struct lu_dirpage));
1755                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1756                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
1757                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1758                 cfs_kunmap(pg);
1759                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
1760         }
1761
1762         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
1763                            mdd_dir_page_build, NULL);
1764         if (rc >= 0) {
1765                 struct lu_dirpage       *dp;
1766
1767                 dp = cfs_kmap(rdpg->rp_pages[0]);
1768                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1769                 if (rc == 0) {
1770                         /*
1771                          * No pages were processed, mark this for first page
1772                          * and send back.
1773                          */
1774                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1775                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
1776                 }
1777                 cfs_kunmap(rdpg->rp_pages[0]);
1778         }
1779
1780         GOTO(out_unlock, rc);
1781 out_unlock:
1782         mdd_read_unlock(env, mdd_obj);
1783         return rc;
1784 }
1785
1786 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
1787 {
1788         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1789
1790         if (mdd_object_exists(mdd_obj) == 0) {
1791                 CERROR("%s: object "DFID" not found: rc = -2\n",
1792                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1793                 return -ENOENT;
1794         }
1795         return dt_object_sync(env, mdd_object_child(mdd_obj));
1796 }
1797
1798 const struct md_object_operations mdd_obj_ops = {
1799         .moo_permission    = mdd_permission,
1800         .moo_attr_get      = mdd_attr_get,
1801         .moo_attr_set      = mdd_attr_set,
1802         .moo_xattr_get     = mdd_xattr_get,
1803         .moo_xattr_set     = mdd_xattr_set,
1804         .moo_xattr_list    = mdd_xattr_list,
1805         .moo_xattr_del     = mdd_xattr_del,
1806         .moo_open          = mdd_open,
1807         .moo_close         = mdd_close,
1808         .moo_readpage      = mdd_readpage,
1809         .moo_readlink      = mdd_readlink,
1810         .moo_changelog     = mdd_changelog,
1811         .moo_capa_get      = mdd_capa_get,
1812         .moo_object_sync   = mdd_object_sync,
1813         .moo_path          = mdd_path,
1814 };