Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         *buf = LU_BUF_NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 *buf = LU_BUF_NULL;
146         }
147         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         *buf = LU_BUF_NULL;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct lu_object *mdd_object_alloc(const struct lu_env *env,
183                                    const struct lu_object_header *hdr,
184                                    struct lu_device *d)
185 {
186         struct mdd_object *mdd_obj;
187
188         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
189         if (mdd_obj != NULL) {
190                 struct lu_object *o;
191
192                 o = mdd2lu_obj(mdd_obj);
193                 lu_object_init(o, NULL, d);
194                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
195                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
196                 mdd_obj->mod_count = 0;
197                 o->lo_ops = &mdd_lu_obj_ops;
198                 return o;
199         } else {
200                 return NULL;
201         }
202 }
203
204 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
205                            const struct lu_object_conf *unused)
206 {
207         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
208         struct mdd_object *mdd_obj = lu2mdd_obj(o);
209         struct lu_object  *below;
210         struct lu_device  *under;
211         ENTRY;
212
213         mdd_obj->mod_cltime = 0;
214         under = &d->mdd_child->dd_lu_dev;
215         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
216         mdd_pdlock_init(mdd_obj);
217         if (below == NULL)
218                 RETURN(-ENOMEM);
219
220         lu_object_add(o, below);
221
222         RETURN(0);
223 }
224
225 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
226 {
227         if (lu_object_exists(o))
228                 return mdd_get_flags(env, lu2mdd_obj(o));
229         else
230                 return 0;
231 }
232
233 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
234 {
235         struct mdd_object *mdd = lu2mdd_obj(o);
236
237         lu_object_fini(o);
238         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
239 }
240
241 static int mdd_object_print(const struct lu_env *env, void *cookie,
242                             lu_printer_t p, const struct lu_object *o)
243 {
244         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
245         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
246                     "valid=%x, cltime="LPU64", flags=%lx)",
247                     mdd, mdd->mod_count, mdd->mod_valid,
248                     mdd->mod_cltime, mdd->mod_flags);
249 }
250
251 static const struct lu_object_operations mdd_lu_obj_ops = {
252         .loo_object_init    = mdd_object_init,
253         .loo_object_start   = mdd_object_start,
254         .loo_object_free    = mdd_object_free,
255         .loo_object_print   = mdd_object_print,
256 };
257
258 struct mdd_object *mdd_object_find(const struct lu_env *env,
259                                    struct mdd_device *d,
260                                    const struct lu_fid *f)
261 {
262         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
263 }
264
265 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
266                         const char *path, struct lu_fid *fid)
267 {
268         struct lu_buf *buf;
269         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
270         struct mdd_object *obj;
271         struct lu_name *lname = &mdd_env_info(env)->mti_name;
272         char *name;
273         int rc = 0;
274         ENTRY;
275
276         /* temp buffer for path element */
277         buf = mdd_buf_alloc(env, PATH_MAX);
278         if (buf->lb_buf == NULL)
279                 RETURN(-ENOMEM);
280
281         lname->ln_name = name = buf->lb_buf;
282         lname->ln_namelen = 0;
283         *f = mdd->mdd_root_fid;
284
285         while(1) {
286                 while (*path == '/')
287                         path++;
288                 if (*path == '\0')
289                         break;
290                 while (*path != '/' && *path != '\0') {
291                         *name = *path;
292                         path++;
293                         name++;
294                         lname->ln_namelen++;
295                 }
296
297                 *name = '\0';
298                 /* find obj corresponding to fid */
299                 obj = mdd_object_find(env, mdd, f);
300                 if (obj == NULL)
301                         GOTO(out, rc = -EREMOTE);
302                 if (IS_ERR(obj))
303                         GOTO(out, rc = PTR_ERR(obj));
304                 /* get child fid from parent and name */
305                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
306                 mdd_object_put(env, obj);
307                 if (rc)
308                         break;
309
310                 name = buf->lb_buf;
311                 lname->ln_namelen = 0;
312         }
313
314         if (!rc)
315                 *fid = *f;
316 out:
317         RETURN(rc);
318 }
319
320 /** The maximum depth that fid2path() will search.
321  * This is limited only because we want to store the fids for
322  * historical path lookup purposes.
323  */
324 #define MAX_PATH_DEPTH 100
325
326 /** mdd_path() lookup structure. */
327 struct path_lookup_info {
328         __u64                pli_recno;        /**< history point */
329         __u64                pli_currec;       /**< current record */
330         struct lu_fid        pli_fid;
331         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
332         struct mdd_object   *pli_mdd_obj;
333         char                *pli_path;         /**< full path */
334         int                  pli_pathlen;
335         int                  pli_linkno;       /**< which hardlink to follow */
336         int                  pli_fidcount;     /**< number of \a pli_fids */
337 };
338
339 static int mdd_path_current(const struct lu_env *env,
340                             struct path_lookup_info *pli)
341 {
342         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
343         struct mdd_object *mdd_obj;
344         struct lu_buf     *buf = NULL;
345         struct link_ea_header *leh;
346         struct link_ea_entry  *lee;
347         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
348         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
349         char *ptr;
350         int reclen;
351         int rc;
352         ENTRY;
353
354         ptr = pli->pli_path + pli->pli_pathlen - 1;
355         *ptr = 0;
356         --ptr;
357         pli->pli_fidcount = 0;
358         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
359
360         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
361                 mdd_obj = mdd_object_find(env, mdd,
362                                           &pli->pli_fids[pli->pli_fidcount]);
363                 if (mdd_obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(mdd_obj))
366                         GOTO(out, rc = PTR_ERR(mdd_obj));
367                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
368                 if (rc <= 0) {
369                         mdd_object_put(env, mdd_obj);
370                         if (rc == -1)
371                                 rc = -EREMOTE;
372                         else if (rc == 0)
373                                 /* Do I need to error out here? */
374                                 rc = -ENOENT;
375                         GOTO(out, rc);
376                 }
377
378                 /* Get parent fid and object name */
379                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
380                 buf = mdd_links_get(env, mdd_obj);
381                 mdd_read_unlock(env, mdd_obj);
382                 mdd_object_put(env, mdd_obj);
383                 if (IS_ERR(buf))
384                         GOTO(out, rc = PTR_ERR(buf));
385
386                 leh = buf->lb_buf;
387                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
388                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
389
390                 /* If set, use link #linkno for path lookup, otherwise use
391                    link #0.  Only do this for the final path element. */
392                 if ((pli->pli_fidcount == 0) &&
393                     (pli->pli_linkno < leh->leh_reccount)) {
394                         int count;
395                         for (count = 0; count < pli->pli_linkno; count++) {
396                                 lee = (struct link_ea_entry *)
397                                      ((char *)lee + reclen);
398                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
399                         }
400                         if (pli->pli_linkno < leh->leh_reccount - 1)
401                                 /* indicate to user there are more links */
402                                 pli->pli_linkno++;
403                 }
404
405                 /* Pack the name in the end of the buffer */
406                 ptr -= tmpname->ln_namelen;
407                 if (ptr - 1 <= pli->pli_path)
408                         GOTO(out, rc = -EOVERFLOW);
409                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
410                 *(--ptr) = '/';
411
412                 /* Store the parent fid for historic lookup */
413                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
414                         GOTO(out, rc = -EOVERFLOW);
415                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
416         }
417
418         /* Verify that our path hasn't changed since we started the lookup.
419            Record the current index, and verify the path resolves to the
420            same fid. If it does, then the path is correct as of this index. */
421         spin_lock(&mdd->mdd_cl.mc_lock);
422         pli->pli_currec = mdd->mdd_cl.mc_index;
423         spin_unlock(&mdd->mdd_cl.mc_lock);
424         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
425         if (rc) {
426                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
427                 GOTO (out, rc = -EAGAIN);
428         }
429         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
430                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
431                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
432                        PFID(&pli->pli_fid));
433                 GOTO(out, rc = -EAGAIN);
434         }
435         ptr++; /* skip leading / */
436         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
437
438         EXIT;
439 out:
440         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
441                 /* if we vmalloced a large buffer drop it */
442                 mdd_buf_put(buf);
443
444         return rc;
445 }
446
447 static int mdd_path_historic(const struct lu_env *env,
448                              struct path_lookup_info *pli)
449 {
450         return 0;
451 }
452
453 /* Returns the full path to this fid, as of changelog record recno. */
454 static int mdd_path(const struct lu_env *env, struct md_object *obj,
455                     char *path, int pathlen, __u64 *recno, int *linkno)
456 {
457         struct path_lookup_info *pli;
458         int tries = 3;
459         int rc = -EAGAIN;
460         ENTRY;
461
462         if (pathlen < 3)
463                 RETURN(-EOVERFLOW);
464
465         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
466                 path[0] = '\0';
467                 RETURN(0);
468         }
469
470         OBD_ALLOC_PTR(pli);
471         if (pli == NULL)
472                 RETURN(-ENOMEM);
473
474         pli->pli_mdd_obj = md2mdd_obj(obj);
475         pli->pli_recno = *recno;
476         pli->pli_path = path;
477         pli->pli_pathlen = pathlen;
478         pli->pli_linkno = *linkno;
479
480         /* Retry multiple times in case file is being moved */
481         while (tries-- && rc == -EAGAIN)
482                 rc = mdd_path_current(env, pli);
483
484         /* For historical path lookup, the current links may not have existed
485          * at "recno" time.  We must switch over to earlier links/parents
486          * by using the changelog records.  If the earlier parent doesn't
487          * exist, we must search back through the changelog to reconstruct
488          * its parents, then check if it exists, etc.
489          * We may ignore this problem for the initial implementation and
490          * state that an "original" hardlink must still exist for us to find
491          * historic path name. */
492         if (pli->pli_recno != -1) {
493                 rc = mdd_path_historic(env, pli);
494         } else {
495                 *recno = pli->pli_currec;
496                 /* Return next link index to caller */
497                 *linkno = pli->pli_linkno;
498         }
499
500         OBD_FREE_PTR(pli);
501
502         RETURN (rc);
503 }
504
505 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
506 {
507         struct lu_attr *la = &mdd_env_info(env)->mti_la;
508         int rc;
509
510         ENTRY;
511         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
512         if (rc == 0) {
513                 mdd_flags_xlate(obj, la->la_flags);
514         }
515         RETURN(rc);
516 }
517
518 /*
519  * No permission check is needed.
520  */
521 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
522                  struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526
527         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
528                           mdd_object_capa(env, md2mdd_obj(obj)));
529         RETURN(rc);
530 }
531
532 /*
533  * No permission check is needed.
534  */
535 static int mdd_xattr_get(const struct lu_env *env,
536                          struct md_object *obj, struct lu_buf *buf,
537                          const char *name)
538 {
539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
540         int rc;
541
542         ENTRY;
543
544         if (mdd_object_exists(mdd_obj) == 0) {
545                 CERROR("%s: object "DFID" not found: rc = -2\n",
546                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
547                 return -ENOENT;
548         }
549
550         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
551         rc = mdo_xattr_get(env, mdd_obj, buf, name,
552                            mdd_object_capa(env, mdd_obj));
553         mdd_read_unlock(env, mdd_obj);
554
555         RETURN(rc);
556 }
557
558 /*
559  * Permission check is done when open,
560  * no need check again.
561  */
562 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
563                         struct lu_buf *buf)
564 {
565         struct mdd_object *mdd_obj = md2mdd_obj(obj);
566         struct dt_object  *next;
567         loff_t             pos = 0;
568         int                rc;
569         ENTRY;
570
571         if (mdd_object_exists(mdd_obj) == 0) {
572                 CERROR("%s: object "DFID" not found: rc = -2\n",
573                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
574                 return -ENOENT;
575         }
576
577         next = mdd_object_child(mdd_obj);
578         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
579         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
580                                          mdd_object_capa(env, mdd_obj));
581         mdd_read_unlock(env, mdd_obj);
582         RETURN(rc);
583 }
584
585 /*
586  * No permission check is needed.
587  */
588 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
589                           struct lu_buf *buf)
590 {
591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
592         int rc;
593
594         ENTRY;
595
596         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
597         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
598         mdd_read_unlock(env, mdd_obj);
599
600         RETURN(rc);
601 }
602
603 int mdd_declare_object_create_internal(const struct lu_env *env,
604                                        struct mdd_object *p,
605                                        struct mdd_object *c,
606                                        struct lu_attr *attr,
607                                        struct thandle *handle,
608                                        const struct md_op_spec *spec)
609 {
610         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
611         const struct dt_index_features *feat = spec->sp_feat;
612         int rc;
613         ENTRY;
614
615         if (feat != &dt_directory_features && feat != NULL) {
616                 dof->dof_type = DFT_INDEX;
617                 dof->u.dof_idx.di_feat = feat;
618
619         } else {
620                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
621                 if (dof->dof_type == DFT_REGULAR) {
622                         dof->u.dof_reg.striped =
623                                 md_should_create(spec->sp_cr_flags);
624                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
625                                 dof->u.dof_reg.striped = 0;
626                         /* is this replay? */
627                         if (spec->no_create)
628                                 dof->u.dof_reg.striped = 0;
629                 }
630         }
631
632         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
633
634         RETURN(rc);
635 }
636
637 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
638                                struct mdd_object *c, struct lu_attr *attr,
639                                struct thandle *handle,
640                                const struct md_op_spec *spec)
641 {
642         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
643         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
644         int rc;
645         ENTRY;
646
647         LASSERT(!mdd_object_exists(c));
648
649         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
650
651         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
652
653         RETURN(rc);
654 }
655
656 /**
657  * Make sure the ctime is increased only.
658  */
659 static inline int mdd_attr_check(const struct lu_env *env,
660                                  struct mdd_object *obj,
661                                  struct lu_attr *attr)
662 {
663         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
664         int rc;
665         ENTRY;
666
667         if (attr->la_valid & LA_CTIME) {
668                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
669                 if (rc)
670                         RETURN(rc);
671
672                 if (attr->la_ctime < tmp_la->la_ctime)
673                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
674                 else if (attr->la_valid == LA_CTIME &&
675                          attr->la_ctime == tmp_la->la_ctime)
676                         attr->la_valid &= ~LA_CTIME;
677         }
678         RETURN(0);
679 }
680
681 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
682                           struct lu_attr *attr, struct thandle *handle,
683                           int needacl)
684 {
685         int rc;
686         ENTRY;
687
688         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
689 #ifdef CONFIG_FS_POSIX_ACL
690         if (!rc && (attr->la_valid & LA_MODE) && needacl)
691                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
692 #endif
693         RETURN(rc);
694 }
695
696 int mdd_attr_check_set_internal(const struct lu_env *env,
697                                 struct mdd_object *obj, struct lu_attr *attr,
698                                 struct thandle *handle, int needacl)
699 {
700         int rc;
701         ENTRY;
702
703         rc = mdd_attr_check(env, obj, attr);
704         if (rc)
705                 RETURN(rc);
706
707         if (attr->la_valid)
708                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
709         RETURN(rc);
710 }
711
712 /*
713  * This gives the same functionality as the code between
714  * sys_chmod and inode_setattr
715  * chown_common and inode_setattr
716  * utimes and inode_setattr
717  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
718  */
719 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
720                         struct lu_attr *la, const unsigned long flags)
721 {
722         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
723         struct md_ucred  *uc;
724         int               rc;
725         ENTRY;
726
727         if (!la->la_valid)
728                 RETURN(0);
729
730         /* Do not permit change file type */
731         if (la->la_valid & LA_TYPE)
732                 RETURN(-EPERM);
733
734         /* They should not be processed by setattr */
735         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
736                 RETURN(-EPERM);
737
738         /* export destroy does not have ->le_ses, but we may want
739          * to drop LUSTRE_SOM_FL. */
740         if (!env->le_ses)
741                 RETURN(0);
742
743         uc = md_ucred(env);
744
745         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
746         if (rc)
747                 RETURN(rc);
748
749         if (la->la_valid == LA_CTIME) {
750                 if (!(flags & MDS_PERM_BYPASS))
751                         /* This is only for set ctime when rename's source is
752                          * on remote MDS. */
753                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
754                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
755                         la->la_valid &= ~LA_CTIME;
756                 RETURN(rc);
757         }
758
759         if (la->la_valid == LA_ATIME) {
760                 /* This is atime only set for read atime update on close. */
761                 if (la->la_atime >= tmp_la->la_atime &&
762                     la->la_atime < (tmp_la->la_atime +
763                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
764                         la->la_valid &= ~LA_ATIME;
765                 RETURN(0);
766         }
767
768         /* Check if flags change. */
769         if (la->la_valid & LA_FLAGS) {
770                 unsigned int oldflags = 0;
771                 unsigned int newflags = la->la_flags &
772                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
773
774                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
775                     !mdd_capable(uc, CFS_CAP_FOWNER))
776                         RETURN(-EPERM);
777
778                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
779                  * only be changed by the relevant capability. */
780                 if (mdd_is_immutable(obj))
781                         oldflags |= LUSTRE_IMMUTABLE_FL;
782                 if (mdd_is_append(obj))
783                         oldflags |= LUSTRE_APPEND_FL;
784                 if ((oldflags ^ newflags) &&
785                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
786                         RETURN(-EPERM);
787
788                 if (!S_ISDIR(tmp_la->la_mode))
789                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
790         }
791
792         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
793             (la->la_valid & ~LA_FLAGS) &&
794             !(flags & MDS_PERM_BYPASS))
795                 RETURN(-EPERM);
796
797         /* Check for setting the obj time. */
798         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
799             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
800                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
801                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
802                         rc = mdd_permission_internal(env, obj, tmp_la,
803                                                      MAY_WRITE);
804                         if (rc)
805                                 RETURN(rc);
806                 }
807         }
808
809         if (la->la_valid & LA_KILL_SUID) {
810                 la->la_valid &= ~LA_KILL_SUID;
811                 if ((tmp_la->la_mode & S_ISUID) &&
812                     !(la->la_valid & LA_MODE)) {
813                         la->la_mode = tmp_la->la_mode;
814                         la->la_valid |= LA_MODE;
815                 }
816                 la->la_mode &= ~S_ISUID;
817         }
818
819         if (la->la_valid & LA_KILL_SGID) {
820                 la->la_valid &= ~LA_KILL_SGID;
821                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
822                                         (S_ISGID | S_IXGRP)) &&
823                     !(la->la_valid & LA_MODE)) {
824                         la->la_mode = tmp_la->la_mode;
825                         la->la_valid |= LA_MODE;
826                 }
827                 la->la_mode &= ~S_ISGID;
828         }
829
830         /* Make sure a caller can chmod. */
831         if (la->la_valid & LA_MODE) {
832                 if (!(flags & MDS_PERM_BYPASS) &&
833                     (uc->mu_fsuid != tmp_la->la_uid) &&
834                     !mdd_capable(uc, CFS_CAP_FOWNER))
835                         RETURN(-EPERM);
836
837                 if (la->la_mode == (cfs_umode_t) -1)
838                         la->la_mode = tmp_la->la_mode;
839                 else
840                         la->la_mode = (la->la_mode & S_IALLUGO) |
841                                       (tmp_la->la_mode & ~S_IALLUGO);
842
843                 /* Also check the setgid bit! */
844                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
845                                        la->la_gid : tmp_la->la_gid) &&
846                     !mdd_capable(uc, CFS_CAP_FSETID))
847                         la->la_mode &= ~S_ISGID;
848         } else {
849                la->la_mode = tmp_la->la_mode;
850         }
851
852         /* Make sure a caller can chown. */
853         if (la->la_valid & LA_UID) {
854                 if (la->la_uid == (uid_t) -1)
855                         la->la_uid = tmp_la->la_uid;
856                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
857                     (la->la_uid != tmp_la->la_uid)) &&
858                     !mdd_capable(uc, CFS_CAP_CHOWN))
859                         RETURN(-EPERM);
860
861                 /* If the user or group of a non-directory has been
862                  * changed by a non-root user, remove the setuid bit.
863                  * 19981026 David C Niemi <niemi@tux.org>
864                  *
865                  * Changed this to apply to all users, including root,
866                  * to avoid some races. This is the behavior we had in
867                  * 2.0. The check for non-root was definitely wrong
868                  * for 2.2 anyway, as it should have been using
869                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
870                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
871                     !S_ISDIR(tmp_la->la_mode)) {
872                         la->la_mode &= ~S_ISUID;
873                         la->la_valid |= LA_MODE;
874                 }
875         }
876
877         /* Make sure caller can chgrp. */
878         if (la->la_valid & LA_GID) {
879                 if (la->la_gid == (gid_t) -1)
880                         la->la_gid = tmp_la->la_gid;
881                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
882                     ((la->la_gid != tmp_la->la_gid) &&
883                     !lustre_in_group_p(uc, la->la_gid))) &&
884                     !mdd_capable(uc, CFS_CAP_CHOWN))
885                         RETURN(-EPERM);
886
887                 /* Likewise, if the user or group of a non-directory
888                  * has been changed by a non-root user, remove the
889                  * setgid bit UNLESS there is no group execute bit
890                  * (this would be a file marked for mandatory
891                  * locking).  19981026 David C Niemi <niemi@tux.org>
892                  *
893                  * Removed the fsuid check (see the comment above) --
894                  * 19990830 SD. */
895                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
896                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
897                         la->la_mode &= ~S_ISGID;
898                         la->la_valid |= LA_MODE;
899                 }
900         }
901
902         /* For both Size-on-MDS case and truncate case,
903          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
904          * We distinguish them by "flags & MDS_SOM".
905          * For SOM case, it is true, the MAY_WRITE perm has been checked
906          * when open, no need check again. For truncate case, it is false,
907          * the MAY_WRITE perm should be checked here. */
908         if (flags & MDS_SOM) {
909                 /* For the "Size-on-MDS" setattr update, merge coming
910                  * attributes with the set in the inode. BUG 10641 */
911                 if ((la->la_valid & LA_ATIME) &&
912                     (la->la_atime <= tmp_la->la_atime))
913                         la->la_valid &= ~LA_ATIME;
914
915                 /* OST attributes do not have a priority over MDS attributes,
916                  * so drop times if ctime is equal. */
917                 if ((la->la_valid & LA_CTIME) &&
918                     (la->la_ctime <= tmp_la->la_ctime))
919                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
920         } else {
921                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
922                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
923                               (uc->mu_fsuid == tmp_la->la_uid)) &&
924                             !(flags & MDS_PERM_BYPASS)) {
925                                 rc = mdd_permission_internal(env, obj,
926                                                              tmp_la, MAY_WRITE);
927                                 if (rc)
928                                         RETURN(rc);
929                         }
930                 }
931                 if (la->la_valid & LA_CTIME) {
932                         /* The pure setattr, it has the priority over what is
933                          * already set, do not drop it if ctime is equal. */
934                         if (la->la_ctime < tmp_la->la_ctime)
935                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
936                                                   LA_CTIME);
937                 }
938         }
939
940         RETURN(0);
941 }
942
943 /** Store a data change changelog record
944  * If this fails, we must fail the whole transaction; we don't
945  * want the change to commit without the log entry.
946  * \param mdd_obj - mdd_object of change
947  * \param handle - transacion handle
948  */
949 static int mdd_changelog_data_store(const struct lu_env *env,
950                                     struct mdd_device *mdd,
951                                     enum changelog_rec_type type,
952                                     int flags, struct mdd_object *mdd_obj,
953                                     struct thandle *handle)
954 {
955         const struct lu_fid             *tfid = mdo2fid(mdd_obj);
956         struct llog_changelog_rec       *rec;
957         struct lu_buf                   *buf;
958         int                              reclen;
959         int                              rc;
960
961         /* Not recording */
962         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
963                 RETURN(0);
964         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
965                 RETURN(0);
966
967         LASSERT(mdd_obj != NULL);
968         LASSERT(handle != NULL);
969
970         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
971             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
972                 /* Don't need multiple updates in this log */
973                 /* Don't check under lock - no big deal if we get an extra
974                    entry */
975                 RETURN(0);
976         }
977
978         reclen = llog_data_len(sizeof(*rec));
979         buf = mdd_buf_alloc(env, reclen);
980         if (buf->lb_buf == NULL)
981                 RETURN(-ENOMEM);
982         rec = buf->lb_buf;
983
984         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
985         rec->cr.cr_type = (__u32)type;
986         rec->cr.cr_tfid = *tfid;
987         rec->cr.cr_namelen = 0;
988         mdd_obj->mod_cltime = cfs_time_current_64();
989
990         rc = mdd_changelog_store(env, mdd, rec, handle);
991
992         RETURN(rc);
993 }
994
995 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
996                   int flags, struct md_object *obj)
997 {
998         struct thandle *handle;
999         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1000         struct mdd_device *mdd = mdo2mdd(obj);
1001         int rc;
1002         ENTRY;
1003
1004         handle = mdd_trans_create(env, mdd);
1005         if (IS_ERR(handle))
1006                 RETURN(PTR_ERR(handle));
1007
1008         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1009         if (rc)
1010                 GOTO(stop, rc);
1011
1012         rc = mdd_trans_start(env, mdd, handle);
1013         if (rc)
1014                 GOTO(stop, rc);
1015
1016         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1017                                       handle);
1018
1019 stop:
1020         mdd_trans_stop(env, mdd, rc, handle);
1021
1022         RETURN(rc);
1023 }
1024
1025 /**
1026  * Save LMA extended attributes with data from \a ma.
1027  *
1028  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1029  * not, LMA EA will be first read from disk, modified and write back.
1030  *
1031  */
1032 /* Precedence for choosing record type when multiple
1033  * attributes change: setattr > mtime > ctime > atime
1034  * (ctime changes when mtime does, plus chmod/chown.
1035  * atime and ctime are independent.) */
1036 static int mdd_attr_set_changelog(const struct lu_env *env,
1037                                   struct md_object *obj, struct thandle *handle,
1038                                   __u64 valid)
1039 {
1040         struct mdd_device *mdd = mdo2mdd(obj);
1041         int bits, type = 0;
1042
1043         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1044         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1045         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1046         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1047         bits = bits & mdd->mdd_cl.mc_mask;
1048         if (bits == 0)
1049                 return 0;
1050
1051         /* The record type is the lowest non-masked set bit */
1052         while (bits && ((bits & 1) == 0)) {
1053                 bits = bits >> 1;
1054                 type++;
1055         }
1056
1057         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1058         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1059                                         md2mdd_obj(obj), handle);
1060 }
1061
1062 static int mdd_declare_attr_set(const struct lu_env *env,
1063                                 struct mdd_device *mdd,
1064                                 struct mdd_object *obj,
1065                                 const struct lu_attr *attr,
1066                                 struct thandle *handle)
1067 {
1068         int rc;
1069
1070         rc = mdo_declare_attr_set(env, obj, attr, handle);
1071         if (rc)
1072                 return rc;
1073
1074 #ifdef CONFIG_FS_POSIX_ACL
1075         if (attr->la_valid & LA_MODE) {
1076                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1077                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1078                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1079                 mdd_read_unlock(env, obj);
1080                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1081                         rc = 0;
1082                 else if (rc < 0)
1083                         return rc;
1084
1085                 if (rc != 0) {
1086                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1087                         rc = mdo_declare_xattr_set(env, obj, buf,
1088                                                    XATTR_NAME_ACL_ACCESS, 0,
1089                                                    handle);
1090                         if (rc)
1091                                 return rc;
1092                 }
1093         }
1094 #endif
1095
1096         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1097         return rc;
1098 }
1099
1100 /* set attr and LOV EA at once, return updated attr */
1101 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1102                  const struct md_attr *ma)
1103 {
1104         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1105         struct mdd_device *mdd = mdo2mdd(obj);
1106         struct thandle *handle;
1107         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1108         const struct lu_attr *la = &ma->ma_attr;
1109         int rc;
1110         ENTRY;
1111
1112         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1113         LASSERT((ma->ma_valid & MA_LOV) == 0);
1114         LASSERT((ma->ma_valid & MA_HSM) == 0);
1115         LASSERT((ma->ma_valid & MA_SOM) == 0);
1116
1117         *la_copy = ma->ma_attr;
1118         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1119         if (rc)
1120                 RETURN(rc);
1121
1122         /* setattr on "close" only change atime, or do nothing */
1123         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1124                 RETURN(0);
1125
1126         handle = mdd_trans_create(env, mdd);
1127         if (IS_ERR(handle))
1128                 RETURN(PTR_ERR(handle));
1129
1130         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1131         if (rc)
1132                 GOTO(stop, rc);
1133
1134         rc = mdd_trans_start(env, mdd, handle);
1135         if (rc)
1136                 GOTO(stop, rc);
1137
1138         /* permission changes may require sync operation */
1139         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1140                 handle->th_sync |= !!mdd->mdd_sync_permission;
1141
1142         if (la->la_valid & (LA_MTIME | LA_CTIME))
1143                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1144                        la->la_mtime, la->la_ctime);
1145
1146         if (la_copy->la_valid & LA_FLAGS) {
1147                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1148                 if (rc == 0)
1149                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1150         } else if (la_copy->la_valid) {            /* setattr */
1151                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1152         }
1153
1154         if (rc == 0)
1155                 rc = mdd_attr_set_changelog(env, obj, handle,
1156                                             la->la_valid);
1157 stop:
1158         mdd_trans_stop(env, mdd, rc, handle);
1159         RETURN(rc);
1160 }
1161
1162 static int mdd_xattr_sanity_check(const struct lu_env *env,
1163                                   struct mdd_object *obj)
1164 {
1165         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1166         struct md_ucred *uc     = md_ucred(env);
1167         int rc;
1168         ENTRY;
1169
1170         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1171                 RETURN(-EPERM);
1172
1173         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1174         if (rc)
1175                 RETURN(rc);
1176
1177         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1178             !mdd_capable(uc, CFS_CAP_FOWNER))
1179                 RETURN(-EPERM);
1180
1181         RETURN(rc);
1182 }
1183
1184 static int mdd_declare_xattr_set(const struct lu_env *env,
1185                                  struct mdd_device *mdd,
1186                                  struct mdd_object *obj,
1187                                  const struct lu_buf *buf,
1188                                  const char *name,
1189                                  struct thandle *handle)
1190 {
1191         int rc;
1192
1193         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1194         if (rc)
1195                 return rc;
1196
1197         /* Only record user xattr changes */
1198         if ((strncmp("user.", name, 5) == 0))
1199                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1200
1201         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1202         return rc;
1203 }
1204
1205 /**
1206  * The caller should guarantee to update the object ctime
1207  * after xattr_set if needed.
1208  */
1209 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1210                          const struct lu_buf *buf, const char *name,
1211                          int fl)
1212 {
1213         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1214         struct mdd_device *mdd = mdo2mdd(obj);
1215         struct thandle *handle;
1216         int  rc;
1217         ENTRY;
1218
1219         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1220                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1221                 RETURN(rc);
1222         }
1223
1224         rc = mdd_xattr_sanity_check(env, mdd_obj);
1225         if (rc)
1226                 RETURN(rc);
1227
1228         handle = mdd_trans_create(env, mdd);
1229         if (IS_ERR(handle))
1230                 RETURN(PTR_ERR(handle));
1231
1232         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1233         if (rc)
1234                 GOTO(stop, rc);
1235
1236         rc = mdd_trans_start(env, mdd, handle);
1237         if (rc)
1238                 GOTO(stop, rc);
1239
1240         /* security-replated changes may require sync */
1241         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1242                 handle->th_sync |= !!mdd->mdd_sync_permission;
1243
1244         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1245         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1246                            mdd_object_capa(env, mdd_obj));
1247         mdd_write_unlock(env, mdd_obj);
1248         if (rc)
1249                 GOTO(stop, rc);
1250
1251         /* Only record system & user xattr changes */
1252         if (strncmp(XATTR_USER_PREFIX, name,
1253                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1254                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1255                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1256                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1257                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1258                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1259                                               handle);
1260
1261 stop:
1262         mdd_trans_stop(env, mdd, rc, handle);
1263
1264         RETURN(rc);
1265 }
1266
1267 static int mdd_declare_xattr_del(const struct lu_env *env,
1268                                  struct mdd_device *mdd,
1269                                  struct mdd_object *obj,
1270                                  const char *name,
1271                                  struct thandle *handle)
1272 {
1273         int rc;
1274
1275         rc = mdo_declare_xattr_del(env, obj, name, handle);
1276         if (rc)
1277                 return rc;
1278
1279         /* Only record user xattr changes */
1280         if ((strncmp("user.", name, 5) == 0))
1281                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1282
1283         return rc;
1284 }
1285
1286 /**
1287  * The caller should guarantee to update the object ctime
1288  * after xattr_set if needed.
1289  */
1290 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1291                   const char *name)
1292 {
1293         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1294         struct mdd_device *mdd = mdo2mdd(obj);
1295         struct thandle *handle;
1296         int  rc;
1297         ENTRY;
1298
1299         rc = mdd_xattr_sanity_check(env, mdd_obj);
1300         if (rc)
1301                 RETURN(rc);
1302
1303         handle = mdd_trans_create(env, mdd);
1304         if (IS_ERR(handle))
1305                 RETURN(PTR_ERR(handle));
1306
1307         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1308         if (rc)
1309                 GOTO(stop, rc);
1310
1311         rc = mdd_trans_start(env, mdd, handle);
1312         if (rc)
1313                 GOTO(stop, rc);
1314
1315         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1316         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1317                            mdd_object_capa(env, mdd_obj));
1318         mdd_write_unlock(env, mdd_obj);
1319         if (rc)
1320                 GOTO(stop, rc);
1321
1322         /* Only record system & user xattr changes */
1323         if (strncmp(XATTR_USER_PREFIX, name,
1324                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1325                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1326                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1327                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1328                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1329                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1330                                               handle);
1331
1332 stop:
1333         mdd_trans_stop(env, mdd, rc, handle);
1334
1335         RETURN(rc);
1336 }
1337
1338 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1339                 struct mdd_object *child, struct lu_attr *attr)
1340 {
1341         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1342         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1343         struct dt_object *nc = mdd_object_child(child);
1344
1345         /* @hint will be initialized by underlying device. */
1346         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1347 }
1348
1349 /*
1350  * do NOT or the MAY_*'s, you'll get the weakest
1351  */
1352 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1353 {
1354         int res = 0;
1355
1356         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1357          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1358          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1359          * owner can write to a file even if it is marked readonly to hide
1360          * its brokenness. (bug 5781) */
1361         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1362                 struct md_ucred *uc = md_ucred(env);
1363
1364                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1365                     (la->la_uid == uc->mu_fsuid))
1366                         return 0;
1367         }
1368
1369         if (flags & FMODE_READ)
1370                 res |= MAY_READ;
1371         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1372                 res |= MAY_WRITE;
1373         if (flags & MDS_FMODE_EXEC)
1374                 res = MAY_EXEC;
1375         return res;
1376 }
1377
1378 static int mdd_open_sanity_check(const struct lu_env *env,
1379                                  struct mdd_object *obj, int flag)
1380 {
1381         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1382         int mode, rc;
1383         ENTRY;
1384
1385         /* EEXIST check */
1386         if (mdd_is_dead_obj(obj))
1387                 RETURN(-ENOENT);
1388
1389         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1390         if (rc)
1391                RETURN(rc);
1392
1393         if (S_ISLNK(tmp_la->la_mode))
1394                 RETURN(-ELOOP);
1395
1396         mode = accmode(env, tmp_la, flag);
1397
1398         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1399                 RETURN(-EISDIR);
1400
1401         if (!(flag & MDS_OPEN_CREATED)) {
1402                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1403                 if (rc)
1404                         RETURN(rc);
1405         }
1406
1407         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1408             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1409                 flag &= ~MDS_OPEN_TRUNC;
1410
1411         /* For writing append-only file must open it with append mode. */
1412         if (mdd_is_append(obj)) {
1413                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1414                         RETURN(-EPERM);
1415                 if (flag & MDS_OPEN_TRUNC)
1416                         RETURN(-EPERM);
1417         }
1418
1419 #if 0
1420         /*
1421          * Now, flag -- O_NOATIME does not be packed by client.
1422          */
1423         if (flag & O_NOATIME) {
1424                 struct md_ucred *uc = md_ucred(env);
1425
1426                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1427                     (uc->mu_valid == UCRED_NEW)) &&
1428                     (uc->mu_fsuid != tmp_la->la_uid) &&
1429                     !mdd_capable(uc, CFS_CAP_FOWNER))
1430                         RETURN(-EPERM);
1431         }
1432 #endif
1433
1434         RETURN(0);
1435 }
1436
1437 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1438                     int flags)
1439 {
1440         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1441         int rc = 0;
1442
1443         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1444
1445         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1446         if (rc == 0)
1447                 mdd_obj->mod_count++;
1448
1449         mdd_write_unlock(env, mdd_obj);
1450         return rc;
1451 }
1452
1453 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1454                             struct md_attr *ma, struct thandle *handle)
1455 {
1456         return mdo_declare_destroy(env, obj, handle);
1457 }
1458
1459 /* return md_attr back,
1460  * if it is last unlink then return lov ea + llog cookie*/
1461 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1462                     struct md_attr *ma, struct thandle *handle)
1463 {
1464         int rc;
1465         ENTRY;
1466
1467         rc = mdo_destroy(env, obj, handle);
1468
1469         RETURN(rc);
1470 }
1471
1472 static int mdd_declare_close(const struct lu_env *env,
1473                              struct mdd_object *obj,
1474                              struct md_attr *ma,
1475                              struct thandle *handle)
1476 {
1477         int rc;
1478
1479         rc = orph_declare_index_delete(env, obj, handle);
1480         if (rc)
1481                 return rc;
1482
1483         return mdo_declare_destroy(env, obj, handle);
1484 }
1485
1486 /*
1487  * No permission check is needed.
1488  */
1489 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1490                      struct md_attr *ma, int mode)
1491 {
1492         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1493         struct mdd_device *mdd = mdo2mdd(obj);
1494         struct thandle    *handle = NULL;
1495         int rc, is_orphan = 0;
1496         ENTRY;
1497
1498         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1499                 mdd_obj->mod_count--;
1500
1501                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1502                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1503                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1504                 RETURN(0);
1505         }
1506
1507         /* check without any lock */
1508         if (mdd_obj->mod_count == 1 &&
1509             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1510  again:
1511                 handle = mdd_trans_create(env, mdo2mdd(obj));
1512                 if (IS_ERR(handle))
1513                         RETURN(PTR_ERR(handle));
1514
1515                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1516                 if (rc)
1517                         GOTO(stop, rc);
1518
1519                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1520                 if (rc)
1521                         GOTO(stop, rc);
1522
1523                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1524                 if (rc)
1525                         GOTO(stop, rc);
1526         }
1527
1528         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1529         if (handle == NULL && mdd_obj->mod_count == 1 &&
1530             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1531                 mdd_write_unlock(env, mdd_obj);
1532                 goto again;
1533         }
1534
1535         /* release open count */
1536         mdd_obj->mod_count --;
1537
1538         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1539                 /* remove link to object from orphan index */
1540                 LASSERT(handle != NULL);
1541                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1542                 if (rc == 0) {
1543                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1544                                "list, OSS objects to be destroyed.\n",
1545                                PFID(mdd_object_fid(mdd_obj)));
1546                         is_orphan = 1;
1547                 } else {
1548                         CERROR("Object "DFID" can not be deleted from orphan "
1549                                 "list, maybe cause OST objects can not be "
1550                                 "destroyed (err: %d).\n",
1551                                 PFID(mdd_object_fid(mdd_obj)), rc);
1552                         /* If object was not deleted from orphan list, do not
1553                          * destroy OSS objects, which will be done when next
1554                          * recovery. */
1555                         GOTO(out, rc);
1556                 }
1557         }
1558
1559         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1560                         mdd_object_capa(env, mdd_obj));
1561         /* Object maybe not in orphan list originally, it is rare case for
1562          * mdd_finish_unlink() failure. */
1563         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
1564                 if (handle == NULL) {
1565                         handle = mdd_trans_create(env, mdo2mdd(obj));
1566                         if (IS_ERR(handle))
1567                                 GOTO(out, rc = PTR_ERR(handle));
1568
1569                         rc = mdo_declare_destroy(env, mdd_obj, handle);
1570                         if (rc)
1571                                 GOTO(out, rc);
1572
1573                         rc = mdd_declare_changelog_store(env, mdd,
1574                                         NULL, handle);
1575                         if (rc)
1576                                 GOTO(stop, rc);
1577
1578                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1579                         if (rc)
1580                                 GOTO(out, rc);
1581                 }
1582
1583                 rc = mdo_destroy(env, mdd_obj, handle);
1584
1585                 if (rc != 0)
1586                         CERROR("Error when prepare to delete Object "DFID" , "
1587                                "which will cause OST objects can not be "
1588                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1589         }
1590         EXIT;
1591
1592 out:
1593
1594         mdd_write_unlock(env, mdd_obj);
1595
1596         if (rc == 0 &&
1597             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1598             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1599                 if (handle == NULL) {
1600                         handle = mdd_trans_create(env, mdo2mdd(obj));
1601                         if (IS_ERR(handle))
1602                                 GOTO(stop, rc = IS_ERR(handle));
1603
1604                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1605                                                          handle);
1606                         if (rc)
1607                                 GOTO(stop, rc);
1608
1609                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1610                         if (rc)
1611                                 GOTO(stop, rc);
1612                 }
1613
1614                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1615                                          mdd_obj, handle);
1616         }
1617
1618 stop:
1619         if (handle != NULL)
1620                 mdd_trans_stop(env, mdd, rc, handle);
1621         return rc;
1622 }
1623
1624 /*
1625  * Permission check is done when open,
1626  * no need check again.
1627  */
1628 static int mdd_readpage_sanity_check(const struct lu_env *env,
1629                                      struct mdd_object *obj)
1630 {
1631         struct dt_object *next = mdd_object_child(obj);
1632         int rc;
1633         ENTRY;
1634
1635         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1636                 rc = 0;
1637         else
1638                 rc = -ENOTDIR;
1639
1640         RETURN(rc);
1641 }
1642
1643 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
1644                               int nob, const struct dt_it_ops *iops,
1645                               struct dt_it *it, __u32 attr, void *arg)
1646 {
1647         struct lu_dirpage       *dp = &lp->lp_dir;
1648         void                    *area = dp;
1649         int                      result;
1650         __u64                    hash = 0;
1651         struct lu_dirent        *ent;
1652         struct lu_dirent        *last = NULL;
1653         int                      first = 1;
1654
1655         memset(area, 0, sizeof (*dp));
1656         area += sizeof (*dp);
1657         nob  -= sizeof (*dp);
1658
1659         ent  = area;
1660         do {
1661                 int    len;
1662                 int    recsize;
1663
1664                 len = iops->key_size(env, it);
1665
1666                 /* IAM iterator can return record with zero len. */
1667                 if (len == 0)
1668                         goto next;
1669
1670                 hash = iops->store(env, it);
1671                 if (unlikely(first)) {
1672                         first = 0;
1673                         dp->ldp_hash_start = cpu_to_le64(hash);
1674                 }
1675
1676                 /* calculate max space required for lu_dirent */
1677                 recsize = lu_dirent_calc_size(len, attr);
1678
1679                 if (nob >= recsize) {
1680                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
1681                         if (result == -ESTALE)
1682                                 goto next;
1683                         if (result != 0)
1684                                 goto out;
1685
1686                         /* osd might not able to pack all attributes,
1687                          * so recheck rec length */
1688                         recsize = le16_to_cpu(ent->lde_reclen);
1689                 } else {
1690                         result = (last != NULL) ? 0 :-EINVAL;
1691                         goto out;
1692                 }
1693                 last = ent;
1694                 ent = (void *)ent + recsize;
1695                 nob -= recsize;
1696
1697 next:
1698                 result = iops->next(env, it);
1699                 if (result == -ESTALE)
1700                         goto next;
1701         } while (result == 0);
1702
1703 out:
1704         dp->ldp_hash_end = cpu_to_le64(hash);
1705         if (last != NULL) {
1706                 if (last->lde_hash == dp->ldp_hash_end)
1707                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
1708                 last->lde_reclen = 0; /* end mark */
1709         }
1710         if (result > 0)
1711                 /* end of directory */
1712                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
1713         if (result < 0)
1714                 CWARN("build page failed: %d!\n", result);
1715         return result;
1716 }
1717
1718 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1719                  const struct lu_rdpg *rdpg)
1720 {
1721         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1722         int rc;
1723         ENTRY;
1724
1725         if (mdd_object_exists(mdd_obj) == 0) {
1726                 CERROR("%s: object "DFID" not found: rc = -2\n",
1727                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1728                 return -ENOENT;
1729         }
1730
1731         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
1732         rc = mdd_readpage_sanity_check(env, mdd_obj);
1733         if (rc)
1734                 GOTO(out_unlock, rc);
1735
1736         if (mdd_is_dead_obj(mdd_obj)) {
1737                 struct page *pg;
1738                 struct lu_dirpage *dp;
1739
1740                 /*
1741                  * According to POSIX, please do not return any entry to client:
1742                  * even dot and dotdot should not be returned.
1743                  */
1744                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
1745                        PFID(mdd_object_fid(mdd_obj)));
1746
1747                 if (rdpg->rp_count <= 0)
1748                         GOTO(out_unlock, rc = -EFAULT);
1749                 LASSERT(rdpg->rp_pages != NULL);
1750
1751                 pg = rdpg->rp_pages[0];
1752                 dp = (struct lu_dirpage*)cfs_kmap(pg);
1753                 memset(dp, 0 , sizeof(struct lu_dirpage));
1754                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1755                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
1756                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1757                 cfs_kunmap(pg);
1758                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
1759         }
1760
1761         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
1762                            mdd_dir_page_build, NULL);
1763         if (rc >= 0) {
1764                 struct lu_dirpage       *dp;
1765
1766                 dp = cfs_kmap(rdpg->rp_pages[0]);
1767                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1768                 if (rc == 0) {
1769                         /*
1770                          * No pages were processed, mark this for first page
1771                          * and send back.
1772                          */
1773                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1774                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
1775                 }
1776                 cfs_kunmap(rdpg->rp_pages[0]);
1777         }
1778
1779         GOTO(out_unlock, rc);
1780 out_unlock:
1781         mdd_read_unlock(env, mdd_obj);
1782         return rc;
1783 }
1784
1785 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
1786 {
1787         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1788
1789         if (mdd_object_exists(mdd_obj) == 0) {
1790                 CERROR("%s: object "DFID" not found: rc = -2\n",
1791                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1792                 return -ENOENT;
1793         }
1794         return dt_object_sync(env, mdd_object_child(mdd_obj));
1795 }
1796
1797 const struct md_object_operations mdd_obj_ops = {
1798         .moo_permission    = mdd_permission,
1799         .moo_attr_get      = mdd_attr_get,
1800         .moo_attr_set      = mdd_attr_set,
1801         .moo_xattr_get     = mdd_xattr_get,
1802         .moo_xattr_set     = mdd_xattr_set,
1803         .moo_xattr_list    = mdd_xattr_list,
1804         .moo_xattr_del     = mdd_xattr_del,
1805         .moo_open          = mdd_open,
1806         .moo_close         = mdd_close,
1807         .moo_readpage      = mdd_readpage,
1808         .moo_readlink      = mdd_readlink,
1809         .moo_changelog     = mdd_changelog,
1810         .moo_capa_get      = mdd_capa_get,
1811         .moo_object_sync   = mdd_object_sync,
1812         .moo_path          = mdd_path,
1813 };