Whamcloud - gitweb
d4eff75567bc10433cb07423867a5c0a6ad33b86
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         *buf = LU_BUF_NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 *buf = LU_BUF_NULL;
146         }
147         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         *buf = LU_BUF_NULL;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct lu_object *mdd_object_alloc(const struct lu_env *env,
183                                    const struct lu_object_header *hdr,
184                                    struct lu_device *d)
185 {
186         struct mdd_object *mdd_obj;
187
188         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
189         if (mdd_obj != NULL) {
190                 struct lu_object *o;
191
192                 o = mdd2lu_obj(mdd_obj);
193                 lu_object_init(o, NULL, d);
194                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
195                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
196                 mdd_obj->mod_count = 0;
197                 o->lo_ops = &mdd_lu_obj_ops;
198                 return o;
199         } else {
200                 return NULL;
201         }
202 }
203
204 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
205                            const struct lu_object_conf *unused)
206 {
207         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
208         struct mdd_object *mdd_obj = lu2mdd_obj(o);
209         struct lu_object  *below;
210         struct lu_device  *under;
211         ENTRY;
212
213         mdd_obj->mod_cltime = 0;
214         under = &d->mdd_child->dd_lu_dev;
215         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
216         mdd_pdlock_init(mdd_obj);
217         if (below == NULL)
218                 RETURN(-ENOMEM);
219
220         lu_object_add(o, below);
221
222         RETURN(0);
223 }
224
225 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
226 {
227         if (lu_object_exists(o))
228                 return mdd_get_flags(env, lu2mdd_obj(o));
229         else
230                 return 0;
231 }
232
233 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
234 {
235         struct mdd_object *mdd = lu2mdd_obj(o);
236
237         lu_object_fini(o);
238         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
239 }
240
241 static int mdd_object_print(const struct lu_env *env, void *cookie,
242                             lu_printer_t p, const struct lu_object *o)
243 {
244         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
245         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
246                     "valid=%x, cltime="LPU64", flags=%lx)",
247                     mdd, mdd->mod_count, mdd->mod_valid,
248                     mdd->mod_cltime, mdd->mod_flags);
249 }
250
251 static const struct lu_object_operations mdd_lu_obj_ops = {
252         .loo_object_init    = mdd_object_init,
253         .loo_object_start   = mdd_object_start,
254         .loo_object_free    = mdd_object_free,
255         .loo_object_print   = mdd_object_print,
256 };
257
258 struct mdd_object *mdd_object_find(const struct lu_env *env,
259                                    struct mdd_device *d,
260                                    const struct lu_fid *f)
261 {
262         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
263 }
264
265 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
266                         const char *path, struct lu_fid *fid)
267 {
268         struct lu_buf *buf;
269         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
270         struct mdd_object *obj;
271         struct lu_name *lname = &mdd_env_info(env)->mti_name;
272         char *name;
273         int rc = 0;
274         ENTRY;
275
276         /* temp buffer for path element */
277         buf = mdd_buf_alloc(env, PATH_MAX);
278         if (buf->lb_buf == NULL)
279                 RETURN(-ENOMEM);
280
281         lname->ln_name = name = buf->lb_buf;
282         lname->ln_namelen = 0;
283         *f = mdd->mdd_root_fid;
284
285         while(1) {
286                 while (*path == '/')
287                         path++;
288                 if (*path == '\0')
289                         break;
290                 while (*path != '/' && *path != '\0') {
291                         *name = *path;
292                         path++;
293                         name++;
294                         lname->ln_namelen++;
295                 }
296
297                 *name = '\0';
298                 /* find obj corresponding to fid */
299                 obj = mdd_object_find(env, mdd, f);
300                 if (obj == NULL)
301                         GOTO(out, rc = -EREMOTE);
302                 if (IS_ERR(obj))
303                         GOTO(out, rc = PTR_ERR(obj));
304                 /* get child fid from parent and name */
305                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
306                 mdd_object_put(env, obj);
307                 if (rc)
308                         break;
309
310                 name = buf->lb_buf;
311                 lname->ln_namelen = 0;
312         }
313
314         if (!rc)
315                 *fid = *f;
316 out:
317         RETURN(rc);
318 }
319
320 /** The maximum depth that fid2path() will search.
321  * This is limited only because we want to store the fids for
322  * historical path lookup purposes.
323  */
324 #define MAX_PATH_DEPTH 100
325
326 /** mdd_path() lookup structure. */
327 struct path_lookup_info {
328         __u64                pli_recno;        /**< history point */
329         __u64                pli_currec;       /**< current record */
330         struct lu_fid        pli_fid;
331         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
332         struct mdd_object   *pli_mdd_obj;
333         char                *pli_path;         /**< full path */
334         int                  pli_pathlen;
335         int                  pli_linkno;       /**< which hardlink to follow */
336         int                  pli_fidcount;     /**< number of \a pli_fids */
337 };
338
339 static int mdd_path_current(const struct lu_env *env,
340                             struct path_lookup_info *pli)
341 {
342         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
343         struct mdd_object *mdd_obj;
344         struct lu_buf     *buf = NULL;
345         struct link_ea_header *leh;
346         struct link_ea_entry  *lee;
347         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
348         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
349         char *ptr;
350         int reclen;
351         int rc;
352         ENTRY;
353
354         ptr = pli->pli_path + pli->pli_pathlen - 1;
355         *ptr = 0;
356         --ptr;
357         pli->pli_fidcount = 0;
358         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
359
360         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
361                 mdd_obj = mdd_object_find(env, mdd,
362                                           &pli->pli_fids[pli->pli_fidcount]);
363                 if (mdd_obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(mdd_obj))
366                         GOTO(out, rc = PTR_ERR(mdd_obj));
367                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
368                 if (rc <= 0) {
369                         mdd_object_put(env, mdd_obj);
370                         if (rc == -1)
371                                 rc = -EREMOTE;
372                         else if (rc == 0)
373                                 /* Do I need to error out here? */
374                                 rc = -ENOENT;
375                         GOTO(out, rc);
376                 }
377
378                 /* Get parent fid and object name */
379                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
380                 buf = mdd_links_get(env, mdd_obj);
381                 mdd_read_unlock(env, mdd_obj);
382                 mdd_object_put(env, mdd_obj);
383                 if (IS_ERR(buf))
384                         GOTO(out, rc = PTR_ERR(buf));
385
386                 leh = buf->lb_buf;
387                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
388                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
389
390                 /* If set, use link #linkno for path lookup, otherwise use
391                    link #0.  Only do this for the final path element. */
392                 if ((pli->pli_fidcount == 0) &&
393                     (pli->pli_linkno < leh->leh_reccount)) {
394                         int count;
395                         for (count = 0; count < pli->pli_linkno; count++) {
396                                 lee = (struct link_ea_entry *)
397                                      ((char *)lee + reclen);
398                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
399                         }
400                         if (pli->pli_linkno < leh->leh_reccount - 1)
401                                 /* indicate to user there are more links */
402                                 pli->pli_linkno++;
403                 }
404
405                 /* Pack the name in the end of the buffer */
406                 ptr -= tmpname->ln_namelen;
407                 if (ptr - 1 <= pli->pli_path)
408                         GOTO(out, rc = -EOVERFLOW);
409                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
410                 *(--ptr) = '/';
411
412                 /* Store the parent fid for historic lookup */
413                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
414                         GOTO(out, rc = -EOVERFLOW);
415                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
416         }
417
418         /* Verify that our path hasn't changed since we started the lookup.
419            Record the current index, and verify the path resolves to the
420            same fid. If it does, then the path is correct as of this index. */
421         spin_lock(&mdd->mdd_cl.mc_lock);
422         pli->pli_currec = mdd->mdd_cl.mc_index;
423         spin_unlock(&mdd->mdd_cl.mc_lock);
424         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
425         if (rc) {
426                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
427                 GOTO (out, rc = -EAGAIN);
428         }
429         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
430                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
431                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
432                        PFID(&pli->pli_fid));
433                 GOTO(out, rc = -EAGAIN);
434         }
435         ptr++; /* skip leading / */
436         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
437
438         EXIT;
439 out:
440         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
441                 /* if we vmalloced a large buffer drop it */
442                 mdd_buf_put(buf);
443
444         return rc;
445 }
446
447 static int mdd_path_historic(const struct lu_env *env,
448                              struct path_lookup_info *pli)
449 {
450         return 0;
451 }
452
453 /* Returns the full path to this fid, as of changelog record recno. */
454 static int mdd_path(const struct lu_env *env, struct md_object *obj,
455                     char *path, int pathlen, __u64 *recno, int *linkno)
456 {
457         struct path_lookup_info *pli;
458         int tries = 3;
459         int rc = -EAGAIN;
460         ENTRY;
461
462         if (pathlen < 3)
463                 RETURN(-EOVERFLOW);
464
465         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
466                 path[0] = '\0';
467                 RETURN(0);
468         }
469
470         OBD_ALLOC_PTR(pli);
471         if (pli == NULL)
472                 RETURN(-ENOMEM);
473
474         pli->pli_mdd_obj = md2mdd_obj(obj);
475         pli->pli_recno = *recno;
476         pli->pli_path = path;
477         pli->pli_pathlen = pathlen;
478         pli->pli_linkno = *linkno;
479
480         /* Retry multiple times in case file is being moved */
481         while (tries-- && rc == -EAGAIN)
482                 rc = mdd_path_current(env, pli);
483
484         /* For historical path lookup, the current links may not have existed
485          * at "recno" time.  We must switch over to earlier links/parents
486          * by using the changelog records.  If the earlier parent doesn't
487          * exist, we must search back through the changelog to reconstruct
488          * its parents, then check if it exists, etc.
489          * We may ignore this problem for the initial implementation and
490          * state that an "original" hardlink must still exist for us to find
491          * historic path name. */
492         if (pli->pli_recno != -1) {
493                 rc = mdd_path_historic(env, pli);
494         } else {
495                 *recno = pli->pli_currec;
496                 /* Return next link index to caller */
497                 *linkno = pli->pli_linkno;
498         }
499
500         OBD_FREE_PTR(pli);
501
502         RETURN (rc);
503 }
504
505 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
506 {
507         struct lu_attr *la = &mdd_env_info(env)->mti_la;
508         int rc;
509
510         ENTRY;
511         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
512         if (rc == 0) {
513                 mdd_flags_xlate(obj, la->la_flags);
514         }
515         RETURN(rc);
516 }
517
518 /*
519  * No permission check is needed.
520  */
521 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
522                  struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526
527         return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
528                           mdd_object_capa(env, md2mdd_obj(obj)));
529         RETURN(rc);
530 }
531
532 /*
533  * No permission check is needed.
534  */
535 static int mdd_xattr_get(const struct lu_env *env,
536                          struct md_object *obj, struct lu_buf *buf,
537                          const char *name)
538 {
539         struct mdd_object *mdd_obj = md2mdd_obj(obj);
540         int rc;
541
542         ENTRY;
543
544         if (mdd_object_exists(mdd_obj) == 0) {
545                 CERROR("%s: object "DFID" not found: rc = -2\n",
546                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
547                 return -ENOENT;
548         }
549
550         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
551         rc = mdo_xattr_get(env, mdd_obj, buf, name,
552                            mdd_object_capa(env, mdd_obj));
553         mdd_read_unlock(env, mdd_obj);
554
555         RETURN(rc);
556 }
557
558 /*
559  * Permission check is done when open,
560  * no need check again.
561  */
562 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
563                         struct lu_buf *buf)
564 {
565         struct mdd_object *mdd_obj = md2mdd_obj(obj);
566         struct dt_object  *next;
567         loff_t             pos = 0;
568         int                rc;
569         ENTRY;
570
571         if (mdd_object_exists(mdd_obj) == 0) {
572                 CERROR("%s: object "DFID" not found: rc = -2\n",
573                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
574                 return -ENOENT;
575         }
576
577         next = mdd_object_child(mdd_obj);
578         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
579         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
580                                          mdd_object_capa(env, mdd_obj));
581         mdd_read_unlock(env, mdd_obj);
582         RETURN(rc);
583 }
584
585 /*
586  * No permission check is needed.
587  */
588 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
589                           struct lu_buf *buf)
590 {
591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
592         int rc;
593
594         ENTRY;
595
596         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
597         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
598         mdd_read_unlock(env, mdd_obj);
599
600         RETURN(rc);
601 }
602
603 int mdd_declare_object_create_internal(const struct lu_env *env,
604                                        struct mdd_object *p,
605                                        struct mdd_object *c,
606                                        struct lu_attr *attr,
607                                        struct thandle *handle,
608                                        const struct md_op_spec *spec)
609 {
610         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
611         const struct dt_index_features *feat = spec->sp_feat;
612         int rc;
613         ENTRY;
614
615         if (feat != &dt_directory_features && feat != NULL) {
616                 dof->dof_type = DFT_INDEX;
617                 dof->u.dof_idx.di_feat = feat;
618
619         } else {
620                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
621                 if (dof->dof_type == DFT_REGULAR) {
622                         dof->u.dof_reg.striped =
623                                 md_should_create(spec->sp_cr_flags);
624                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
625                                 dof->u.dof_reg.striped = 0;
626                         /* is this replay? */
627                         if (spec->no_create)
628                                 dof->u.dof_reg.striped = 0;
629                 }
630         }
631
632         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
633
634         RETURN(rc);
635 }
636
637 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
638                                struct mdd_object *c, struct lu_attr *attr,
639                                struct thandle *handle,
640                                const struct md_op_spec *spec)
641 {
642         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
643         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
644         int rc;
645         ENTRY;
646
647         LASSERT(!mdd_object_exists(c));
648
649         rc = mdo_create_obj(env, c, attr, hint, dof, handle);
650
651         LASSERT(ergo(rc == 0, mdd_object_exists(c)));
652
653         RETURN(rc);
654 }
655
656 /**
657  * Make sure the ctime is increased only.
658  */
659 static inline int mdd_attr_check(const struct lu_env *env,
660                                  struct mdd_object *obj,
661                                  struct lu_attr *attr)
662 {
663         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
664         int rc;
665         ENTRY;
666
667         if (attr->la_valid & LA_CTIME) {
668                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
669                 if (rc)
670                         RETURN(rc);
671
672                 if (attr->la_ctime < tmp_la->la_ctime)
673                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
674                 else if (attr->la_valid == LA_CTIME &&
675                          attr->la_ctime == tmp_la->la_ctime)
676                         attr->la_valid &= ~LA_CTIME;
677         }
678         RETURN(0);
679 }
680
681 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
682                           struct lu_attr *attr, struct thandle *handle,
683                           int needacl)
684 {
685         int rc;
686         ENTRY;
687
688         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
689 #ifdef CONFIG_FS_POSIX_ACL
690         if (!rc && (attr->la_valid & LA_MODE) && needacl)
691                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
692 #endif
693         RETURN(rc);
694 }
695
696 int mdd_attr_check_set_internal(const struct lu_env *env,
697                                 struct mdd_object *obj, struct lu_attr *attr,
698                                 struct thandle *handle, int needacl)
699 {
700         int rc;
701         ENTRY;
702
703         rc = mdd_attr_check(env, obj, attr);
704         if (rc)
705                 RETURN(rc);
706
707         if (attr->la_valid)
708                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
709         RETURN(rc);
710 }
711
712 /*
713  * This gives the same functionality as the code between
714  * sys_chmod and inode_setattr
715  * chown_common and inode_setattr
716  * utimes and inode_setattr
717  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
718  */
719 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
720                         struct lu_attr *la, const unsigned long flags)
721 {
722         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
723         struct lu_ucred  *uc;
724         int               rc;
725         ENTRY;
726
727         if (!la->la_valid)
728                 RETURN(0);
729
730         /* Do not permit change file type */
731         if (la->la_valid & LA_TYPE)
732                 RETURN(-EPERM);
733
734         /* They should not be processed by setattr */
735         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
736                 RETURN(-EPERM);
737
738         /* export destroy does not have ->le_ses, but we may want
739          * to drop LUSTRE_SOM_FL. */
740         uc = lu_ucred_check(env);
741         if (uc == NULL)
742                 RETURN(0);
743
744         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
745         if (rc)
746                 RETURN(rc);
747
748         if (la->la_valid == LA_CTIME) {
749                 if (!(flags & MDS_PERM_BYPASS))
750                         /* This is only for set ctime when rename's source is
751                          * on remote MDS. */
752                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
753                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
754                         la->la_valid &= ~LA_CTIME;
755                 RETURN(rc);
756         }
757
758         if (la->la_valid == LA_ATIME) {
759                 /* This is atime only set for read atime update on close. */
760                 if (la->la_atime >= tmp_la->la_atime &&
761                     la->la_atime < (tmp_la->la_atime +
762                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
763                         la->la_valid &= ~LA_ATIME;
764                 RETURN(0);
765         }
766
767         /* Check if flags change. */
768         if (la->la_valid & LA_FLAGS) {
769                 unsigned int oldflags = 0;
770                 unsigned int newflags = la->la_flags &
771                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
772
773                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
774                     !mdd_capable(uc, CFS_CAP_FOWNER))
775                         RETURN(-EPERM);
776
777                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
778                  * only be changed by the relevant capability. */
779                 if (mdd_is_immutable(obj))
780                         oldflags |= LUSTRE_IMMUTABLE_FL;
781                 if (mdd_is_append(obj))
782                         oldflags |= LUSTRE_APPEND_FL;
783                 if ((oldflags ^ newflags) &&
784                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
785                         RETURN(-EPERM);
786
787                 if (!S_ISDIR(tmp_la->la_mode))
788                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
789         }
790
791         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
792             (la->la_valid & ~LA_FLAGS) &&
793             !(flags & MDS_PERM_BYPASS))
794                 RETURN(-EPERM);
795
796         /* Check for setting the obj time. */
797         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
798             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
799                 if ((uc->uc_fsuid != tmp_la->la_uid) &&
800                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
801                         rc = mdd_permission_internal(env, obj, tmp_la,
802                                                      MAY_WRITE);
803                         if (rc)
804                                 RETURN(rc);
805                 }
806         }
807
808         if (la->la_valid & LA_KILL_SUID) {
809                 la->la_valid &= ~LA_KILL_SUID;
810                 if ((tmp_la->la_mode & S_ISUID) &&
811                     !(la->la_valid & LA_MODE)) {
812                         la->la_mode = tmp_la->la_mode;
813                         la->la_valid |= LA_MODE;
814                 }
815                 la->la_mode &= ~S_ISUID;
816         }
817
818         if (la->la_valid & LA_KILL_SGID) {
819                 la->la_valid &= ~LA_KILL_SGID;
820                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
821                                         (S_ISGID | S_IXGRP)) &&
822                     !(la->la_valid & LA_MODE)) {
823                         la->la_mode = tmp_la->la_mode;
824                         la->la_valid |= LA_MODE;
825                 }
826                 la->la_mode &= ~S_ISGID;
827         }
828
829         /* Make sure a caller can chmod. */
830         if (la->la_valid & LA_MODE) {
831                 if (!(flags & MDS_PERM_BYPASS) &&
832                     (uc->uc_fsuid != tmp_la->la_uid) &&
833                     !mdd_capable(uc, CFS_CAP_FOWNER))
834                         RETURN(-EPERM);
835
836                 if (la->la_mode == (cfs_umode_t) -1)
837                         la->la_mode = tmp_la->la_mode;
838                 else
839                         la->la_mode = (la->la_mode & S_IALLUGO) |
840                                       (tmp_la->la_mode & ~S_IALLUGO);
841
842                 /* Also check the setgid bit! */
843                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
844                                        la->la_gid : tmp_la->la_gid) &&
845                     !mdd_capable(uc, CFS_CAP_FSETID))
846                         la->la_mode &= ~S_ISGID;
847         } else {
848                la->la_mode = tmp_la->la_mode;
849         }
850
851         /* Make sure a caller can chown. */
852         if (la->la_valid & LA_UID) {
853                 if (la->la_uid == (uid_t) -1)
854                         la->la_uid = tmp_la->la_uid;
855                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
856                      (la->la_uid != tmp_la->la_uid)) &&
857                     !mdd_capable(uc, CFS_CAP_CHOWN))
858                         RETURN(-EPERM);
859
860                 /* If the user or group of a non-directory has been
861                  * changed by a non-root user, remove the setuid bit.
862                  * 19981026 David C Niemi <niemi@tux.org>
863                  *
864                  * Changed this to apply to all users, including root,
865                  * to avoid some races. This is the behavior we had in
866                  * 2.0. The check for non-root was definitely wrong
867                  * for 2.2 anyway, as it should have been using
868                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
869                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
870                     !S_ISDIR(tmp_la->la_mode)) {
871                         la->la_mode &= ~S_ISUID;
872                         la->la_valid |= LA_MODE;
873                 }
874         }
875
876         /* Make sure caller can chgrp. */
877         if (la->la_valid & LA_GID) {
878                 if (la->la_gid == (gid_t) -1)
879                         la->la_gid = tmp_la->la_gid;
880                 if (((uc->uc_fsuid != tmp_la->la_uid) ||
881                      ((la->la_gid != tmp_la->la_gid) &&
882                       !lustre_in_group_p(uc, la->la_gid))) &&
883                     !mdd_capable(uc, CFS_CAP_CHOWN))
884                         RETURN(-EPERM);
885
886                 /* Likewise, if the user or group of a non-directory
887                  * has been changed by a non-root user, remove the
888                  * setgid bit UNLESS there is no group execute bit
889                  * (this would be a file marked for mandatory
890                  * locking).  19981026 David C Niemi <niemi@tux.org>
891                  *
892                  * Removed the fsuid check (see the comment above) --
893                  * 19990830 SD. */
894                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
895                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
896                         la->la_mode &= ~S_ISGID;
897                         la->la_valid |= LA_MODE;
898                 }
899         }
900
901         /* For both Size-on-MDS case and truncate case,
902          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
903          * We distinguish them by "flags & MDS_SOM".
904          * For SOM case, it is true, the MAY_WRITE perm has been checked
905          * when open, no need check again. For truncate case, it is false,
906          * the MAY_WRITE perm should be checked here. */
907         if (flags & MDS_SOM) {
908                 /* For the "Size-on-MDS" setattr update, merge coming
909                  * attributes with the set in the inode. BUG 10641 */
910                 if ((la->la_valid & LA_ATIME) &&
911                     (la->la_atime <= tmp_la->la_atime))
912                         la->la_valid &= ~LA_ATIME;
913
914                 /* OST attributes do not have a priority over MDS attributes,
915                  * so drop times if ctime is equal. */
916                 if ((la->la_valid & LA_CTIME) &&
917                     (la->la_ctime <= tmp_la->la_ctime))
918                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
919         } else {
920                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
921                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
922                               (uc->uc_fsuid == tmp_la->la_uid)) &&
923                             !(flags & MDS_PERM_BYPASS)) {
924                                 rc = mdd_permission_internal(env, obj,
925                                                              tmp_la, MAY_WRITE);
926                                 if (rc)
927                                         RETURN(rc);
928                         }
929                 }
930                 if (la->la_valid & LA_CTIME) {
931                         /* The pure setattr, it has the priority over what is
932                          * already set, do not drop it if ctime is equal. */
933                         if (la->la_ctime < tmp_la->la_ctime)
934                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
935                                                   LA_CTIME);
936                 }
937         }
938
939         RETURN(0);
940 }
941
942 /** Store a data change changelog record
943  * If this fails, we must fail the whole transaction; we don't
944  * want the change to commit without the log entry.
945  * \param mdd_obj - mdd_object of change
946  * \param handle - transacion handle
947  */
948 static int mdd_changelog_data_store(const struct lu_env *env,
949                                     struct mdd_device *mdd,
950                                     enum changelog_rec_type type,
951                                     int flags, struct mdd_object *mdd_obj,
952                                     struct thandle *handle)
953 {
954         const struct lu_fid             *tfid = mdo2fid(mdd_obj);
955         struct llog_changelog_rec       *rec;
956         struct lu_buf                   *buf;
957         int                              reclen;
958         int                              rc;
959
960         /* Not recording */
961         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
962                 RETURN(0);
963         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
964                 RETURN(0);
965
966         LASSERT(mdd_obj != NULL);
967         LASSERT(handle != NULL);
968
969         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
970             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
971                 /* Don't need multiple updates in this log */
972                 /* Don't check under lock - no big deal if we get an extra
973                    entry */
974                 RETURN(0);
975         }
976
977         reclen = llog_data_len(sizeof(*rec));
978         buf = mdd_buf_alloc(env, reclen);
979         if (buf->lb_buf == NULL)
980                 RETURN(-ENOMEM);
981         rec = buf->lb_buf;
982
983         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
984         rec->cr.cr_type = (__u32)type;
985         rec->cr.cr_tfid = *tfid;
986         rec->cr.cr_namelen = 0;
987         mdd_obj->mod_cltime = cfs_time_current_64();
988
989         rc = mdd_changelog_store(env, mdd, rec, handle);
990
991         RETURN(rc);
992 }
993
994 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
995                   int flags, struct md_object *obj)
996 {
997         struct thandle *handle;
998         struct mdd_object *mdd_obj = md2mdd_obj(obj);
999         struct mdd_device *mdd = mdo2mdd(obj);
1000         int rc;
1001         ENTRY;
1002
1003         handle = mdd_trans_create(env, mdd);
1004         if (IS_ERR(handle))
1005                 RETURN(PTR_ERR(handle));
1006
1007         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1008         if (rc)
1009                 GOTO(stop, rc);
1010
1011         rc = mdd_trans_start(env, mdd, handle);
1012         if (rc)
1013                 GOTO(stop, rc);
1014
1015         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1016                                       handle);
1017
1018 stop:
1019         mdd_trans_stop(env, mdd, rc, handle);
1020
1021         RETURN(rc);
1022 }
1023
1024 /**
1025  * Save LMA extended attributes with data from \a ma.
1026  *
1027  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1028  * not, LMA EA will be first read from disk, modified and write back.
1029  *
1030  */
1031 /* Precedence for choosing record type when multiple
1032  * attributes change: setattr > mtime > ctime > atime
1033  * (ctime changes when mtime does, plus chmod/chown.
1034  * atime and ctime are independent.) */
1035 static int mdd_attr_set_changelog(const struct lu_env *env,
1036                                   struct md_object *obj, struct thandle *handle,
1037                                   __u64 valid)
1038 {
1039         struct mdd_device *mdd = mdo2mdd(obj);
1040         int bits, type = 0;
1041
1042         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1043         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1044         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1045         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1046         bits = bits & mdd->mdd_cl.mc_mask;
1047         if (bits == 0)
1048                 return 0;
1049
1050         /* The record type is the lowest non-masked set bit */
1051         while (bits && ((bits & 1) == 0)) {
1052                 bits = bits >> 1;
1053                 type++;
1054         }
1055
1056         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1057         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1058                                         md2mdd_obj(obj), handle);
1059 }
1060
1061 static int mdd_declare_attr_set(const struct lu_env *env,
1062                                 struct mdd_device *mdd,
1063                                 struct mdd_object *obj,
1064                                 const struct lu_attr *attr,
1065                                 struct thandle *handle)
1066 {
1067         int rc;
1068
1069         rc = mdo_declare_attr_set(env, obj, attr, handle);
1070         if (rc)
1071                 return rc;
1072
1073 #ifdef CONFIG_FS_POSIX_ACL
1074         if (attr->la_valid & LA_MODE) {
1075                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1076                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1077                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1078                 mdd_read_unlock(env, obj);
1079                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1080                         rc = 0;
1081                 else if (rc < 0)
1082                         return rc;
1083
1084                 if (rc != 0) {
1085                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1086                         rc = mdo_declare_xattr_set(env, obj, buf,
1087                                                    XATTR_NAME_ACL_ACCESS, 0,
1088                                                    handle);
1089                         if (rc)
1090                                 return rc;
1091                 }
1092         }
1093 #endif
1094
1095         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1096         return rc;
1097 }
1098
1099 /* set attr and LOV EA at once, return updated attr */
1100 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1101                  const struct md_attr *ma)
1102 {
1103         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1104         struct mdd_device *mdd = mdo2mdd(obj);
1105         struct thandle *handle;
1106         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1107         const struct lu_attr *la = &ma->ma_attr;
1108         int rc;
1109         ENTRY;
1110
1111         /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
1112         LASSERT((ma->ma_valid & MA_LOV) == 0);
1113         LASSERT((ma->ma_valid & MA_HSM) == 0);
1114         LASSERT((ma->ma_valid & MA_SOM) == 0);
1115
1116         *la_copy = ma->ma_attr;
1117         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1118         if (rc)
1119                 RETURN(rc);
1120
1121         /* setattr on "close" only change atime, or do nothing */
1122         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1123                 RETURN(0);
1124
1125         handle = mdd_trans_create(env, mdd);
1126         if (IS_ERR(handle))
1127                 RETURN(PTR_ERR(handle));
1128
1129         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
1130         if (rc)
1131                 GOTO(stop, rc);
1132
1133         rc = mdd_trans_start(env, mdd, handle);
1134         if (rc)
1135                 GOTO(stop, rc);
1136
1137         /* permission changes may require sync operation */
1138         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1139                 handle->th_sync |= !!mdd->mdd_sync_permission;
1140
1141         if (la->la_valid & (LA_MTIME | LA_CTIME))
1142                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1143                        la->la_mtime, la->la_ctime);
1144
1145         if (la_copy->la_valid & LA_FLAGS) {
1146                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1147                 if (rc == 0)
1148                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1149         } else if (la_copy->la_valid) {            /* setattr */
1150                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1151         }
1152
1153         if (rc == 0)
1154                 rc = mdd_attr_set_changelog(env, obj, handle,
1155                                             la->la_valid);
1156 stop:
1157         mdd_trans_stop(env, mdd, rc, handle);
1158         RETURN(rc);
1159 }
1160
1161 static int mdd_xattr_sanity_check(const struct lu_env *env,
1162                                   struct mdd_object *obj)
1163 {
1164         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1165         struct lu_ucred *uc     = lu_ucred_assert(env);
1166         int rc;
1167         ENTRY;
1168
1169         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1170                 RETURN(-EPERM);
1171
1172         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1173         if (rc)
1174                 RETURN(rc);
1175
1176         if ((uc->uc_fsuid != tmp_la->la_uid) &&
1177             !mdd_capable(uc, CFS_CAP_FOWNER))
1178                 RETURN(-EPERM);
1179
1180         RETURN(rc);
1181 }
1182
1183 static int mdd_declare_xattr_set(const struct lu_env *env,
1184                                  struct mdd_device *mdd,
1185                                  struct mdd_object *obj,
1186                                  const struct lu_buf *buf,
1187                                  const char *name,
1188                                  struct thandle *handle)
1189 {
1190         int rc;
1191
1192         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1193         if (rc)
1194                 return rc;
1195
1196         /* Only record user xattr changes */
1197         if ((strncmp("user.", name, 5) == 0))
1198                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1199
1200         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1201         return rc;
1202 }
1203
1204 /**
1205  * The caller should guarantee to update the object ctime
1206  * after xattr_set if needed.
1207  */
1208 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1209                          const struct lu_buf *buf, const char *name,
1210                          int fl)
1211 {
1212         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1213         struct mdd_device *mdd = mdo2mdd(obj);
1214         struct thandle *handle;
1215         int  rc;
1216         ENTRY;
1217
1218         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1219                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1220                 RETURN(rc);
1221         }
1222
1223         rc = mdd_xattr_sanity_check(env, mdd_obj);
1224         if (rc)
1225                 RETURN(rc);
1226
1227         handle = mdd_trans_create(env, mdd);
1228         if (IS_ERR(handle))
1229                 RETURN(PTR_ERR(handle));
1230
1231         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1232         if (rc)
1233                 GOTO(stop, rc);
1234
1235         rc = mdd_trans_start(env, mdd, handle);
1236         if (rc)
1237                 GOTO(stop, rc);
1238
1239         /* security-replated changes may require sync */
1240         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1241                 handle->th_sync |= !!mdd->mdd_sync_permission;
1242
1243         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1244         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1245                            mdd_object_capa(env, mdd_obj));
1246         mdd_write_unlock(env, mdd_obj);
1247         if (rc)
1248                 GOTO(stop, rc);
1249
1250         /* Only record system & user xattr changes */
1251         if (strncmp(XATTR_USER_PREFIX, name,
1252                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1253                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1254                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1255                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1256                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1257                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1258                                               handle);
1259
1260 stop:
1261         mdd_trans_stop(env, mdd, rc, handle);
1262
1263         RETURN(rc);
1264 }
1265
1266 static int mdd_declare_xattr_del(const struct lu_env *env,
1267                                  struct mdd_device *mdd,
1268                                  struct mdd_object *obj,
1269                                  const char *name,
1270                                  struct thandle *handle)
1271 {
1272         int rc;
1273
1274         rc = mdo_declare_xattr_del(env, obj, name, handle);
1275         if (rc)
1276                 return rc;
1277
1278         /* Only record user xattr changes */
1279         if ((strncmp("user.", name, 5) == 0))
1280                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1281
1282         return rc;
1283 }
1284
1285 /**
1286  * The caller should guarantee to update the object ctime
1287  * after xattr_set if needed.
1288  */
1289 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1290                   const char *name)
1291 {
1292         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1293         struct mdd_device *mdd = mdo2mdd(obj);
1294         struct thandle *handle;
1295         int  rc;
1296         ENTRY;
1297
1298         rc = mdd_xattr_sanity_check(env, mdd_obj);
1299         if (rc)
1300                 RETURN(rc);
1301
1302         handle = mdd_trans_create(env, mdd);
1303         if (IS_ERR(handle))
1304                 RETURN(PTR_ERR(handle));
1305
1306         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1307         if (rc)
1308                 GOTO(stop, rc);
1309
1310         rc = mdd_trans_start(env, mdd, handle);
1311         if (rc)
1312                 GOTO(stop, rc);
1313
1314         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1315         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1316                            mdd_object_capa(env, mdd_obj));
1317         mdd_write_unlock(env, mdd_obj);
1318         if (rc)
1319                 GOTO(stop, rc);
1320
1321         /* Only record system & user xattr changes */
1322         if (strncmp(XATTR_USER_PREFIX, name,
1323                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1324                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1325                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1326                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1327                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1328                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1329                                               handle);
1330
1331 stop:
1332         mdd_trans_stop(env, mdd, rc, handle);
1333
1334         RETURN(rc);
1335 }
1336
1337 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1338                 struct mdd_object *child, struct lu_attr *attr)
1339 {
1340         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1341         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1342         struct dt_object *nc = mdd_object_child(child);
1343
1344         /* @hint will be initialized by underlying device. */
1345         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1346 }
1347
1348 /*
1349  * do NOT or the MAY_*'s, you'll get the weakest
1350  */
1351 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1352 {
1353         int res = 0;
1354
1355         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1356          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1357          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1358          * owner can write to a file even if it is marked readonly to hide
1359          * its brokenness. (bug 5781) */
1360         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1361                 struct lu_ucred *uc = lu_ucred_check(env);
1362
1363                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
1364                         return 0;
1365         }
1366
1367         if (flags & FMODE_READ)
1368                 res |= MAY_READ;
1369         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1370                 res |= MAY_WRITE;
1371         if (flags & MDS_FMODE_EXEC)
1372                 res = MAY_EXEC;
1373         return res;
1374 }
1375
1376 static int mdd_open_sanity_check(const struct lu_env *env,
1377                                  struct mdd_object *obj, int flag)
1378 {
1379         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1380         int mode, rc;
1381         ENTRY;
1382
1383         /* EEXIST check */
1384         if (mdd_is_dead_obj(obj))
1385                 RETURN(-ENOENT);
1386
1387         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1388         if (rc)
1389                RETURN(rc);
1390
1391         if (S_ISLNK(tmp_la->la_mode))
1392                 RETURN(-ELOOP);
1393
1394         mode = accmode(env, tmp_la, flag);
1395
1396         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1397                 RETURN(-EISDIR);
1398
1399         if (!(flag & MDS_OPEN_CREATED)) {
1400                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1401                 if (rc)
1402                         RETURN(rc);
1403         }
1404
1405         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1406             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1407                 flag &= ~MDS_OPEN_TRUNC;
1408
1409         /* For writing append-only file must open it with append mode. */
1410         if (mdd_is_append(obj)) {
1411                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1412                         RETURN(-EPERM);
1413                 if (flag & MDS_OPEN_TRUNC)
1414                         RETURN(-EPERM);
1415         }
1416
1417 #if 0
1418         /*
1419          * Now, flag -- O_NOATIME does not be packed by client.
1420          */
1421         if (flag & O_NOATIME) {
1422                 struct lu_ucred *uc = lu_ucred(env);
1423
1424                 if (uc && ((uc->uc_valid == UCRED_OLD) ||
1425                            (uc->uc_valid == UCRED_NEW)) &&
1426                     (uc->uc_fsuid != tmp_la->la_uid) &&
1427                     !mdd_capable(uc, CFS_CAP_FOWNER))
1428                         RETURN(-EPERM);
1429         }
1430 #endif
1431
1432         RETURN(0);
1433 }
1434
1435 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1436                     int flags)
1437 {
1438         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1439         int rc = 0;
1440
1441         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1442
1443         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1444         if (rc == 0)
1445                 mdd_obj->mod_count++;
1446
1447         mdd_write_unlock(env, mdd_obj);
1448         return rc;
1449 }
1450
1451 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1452                             struct md_attr *ma, struct thandle *handle)
1453 {
1454         return mdo_declare_destroy(env, obj, handle);
1455 }
1456
1457 /* return md_attr back,
1458  * if it is last unlink then return lov ea + llog cookie*/
1459 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1460                     struct md_attr *ma, struct thandle *handle)
1461 {
1462         int rc;
1463         ENTRY;
1464
1465         rc = mdo_destroy(env, obj, handle);
1466
1467         RETURN(rc);
1468 }
1469
1470 static int mdd_declare_close(const struct lu_env *env,
1471                              struct mdd_object *obj,
1472                              struct md_attr *ma,
1473                              struct thandle *handle)
1474 {
1475         int rc;
1476
1477         rc = orph_declare_index_delete(env, obj, handle);
1478         if (rc)
1479                 return rc;
1480
1481         return mdo_declare_destroy(env, obj, handle);
1482 }
1483
1484 /*
1485  * No permission check is needed.
1486  */
1487 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1488                      struct md_attr *ma, int mode)
1489 {
1490         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1491         struct mdd_device *mdd = mdo2mdd(obj);
1492         struct thandle    *handle = NULL;
1493         int rc, is_orphan = 0;
1494         ENTRY;
1495
1496         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1497                 mdd_obj->mod_count--;
1498
1499                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1500                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1501                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1502                 RETURN(0);
1503         }
1504
1505         /* check without any lock */
1506         if (mdd_obj->mod_count == 1 &&
1507             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1508  again:
1509                 handle = mdd_trans_create(env, mdo2mdd(obj));
1510                 if (IS_ERR(handle))
1511                         RETURN(PTR_ERR(handle));
1512
1513                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1514                 if (rc)
1515                         GOTO(stop, rc);
1516
1517                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1518                 if (rc)
1519                         GOTO(stop, rc);
1520
1521                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1522                 if (rc)
1523                         GOTO(stop, rc);
1524         }
1525
1526         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1527         if (handle == NULL && mdd_obj->mod_count == 1 &&
1528             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1529                 mdd_write_unlock(env, mdd_obj);
1530                 goto again;
1531         }
1532
1533         /* release open count */
1534         mdd_obj->mod_count --;
1535
1536         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1537                 /* remove link to object from orphan index */
1538                 LASSERT(handle != NULL);
1539                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1540                 if (rc == 0) {
1541                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1542                                "list, OSS objects to be destroyed.\n",
1543                                PFID(mdd_object_fid(mdd_obj)));
1544                         is_orphan = 1;
1545                 } else {
1546                         CERROR("Object "DFID" can not be deleted from orphan "
1547                                 "list, maybe cause OST objects can not be "
1548                                 "destroyed (err: %d).\n",
1549                                 PFID(mdd_object_fid(mdd_obj)), rc);
1550                         /* If object was not deleted from orphan list, do not
1551                          * destroy OSS objects, which will be done when next
1552                          * recovery. */
1553                         GOTO(out, rc);
1554                 }
1555         }
1556
1557         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
1558                         mdd_object_capa(env, mdd_obj));
1559         /* Object maybe not in orphan list originally, it is rare case for
1560          * mdd_finish_unlink() failure. */
1561         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
1562                 if (handle == NULL) {
1563                         handle = mdd_trans_create(env, mdo2mdd(obj));
1564                         if (IS_ERR(handle))
1565                                 GOTO(out, rc = PTR_ERR(handle));
1566
1567                         rc = mdo_declare_destroy(env, mdd_obj, handle);
1568                         if (rc)
1569                                 GOTO(out, rc);
1570
1571                         rc = mdd_declare_changelog_store(env, mdd,
1572                                         NULL, handle);
1573                         if (rc)
1574                                 GOTO(stop, rc);
1575
1576                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1577                         if (rc)
1578                                 GOTO(out, rc);
1579                 }
1580
1581                 rc = mdo_destroy(env, mdd_obj, handle);
1582
1583                 if (rc != 0)
1584                         CERROR("Error when prepare to delete Object "DFID" , "
1585                                "which will cause OST objects can not be "
1586                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1587         }
1588         EXIT;
1589
1590 out:
1591
1592         mdd_write_unlock(env, mdd_obj);
1593
1594         if (rc == 0 &&
1595             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
1596             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
1597                 if (handle == NULL) {
1598                         handle = mdd_trans_create(env, mdo2mdd(obj));
1599                         if (IS_ERR(handle))
1600                                 GOTO(stop, rc = IS_ERR(handle));
1601
1602                         rc = mdd_declare_changelog_store(env, mdd, NULL,
1603                                                          handle);
1604                         if (rc)
1605                                 GOTO(stop, rc);
1606
1607                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1608                         if (rc)
1609                                 GOTO(stop, rc);
1610                 }
1611
1612                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
1613                                          mdd_obj, handle);
1614         }
1615
1616 stop:
1617         if (handle != NULL)
1618                 mdd_trans_stop(env, mdd, rc, handle);
1619         return rc;
1620 }
1621
1622 /*
1623  * Permission check is done when open,
1624  * no need check again.
1625  */
1626 static int mdd_readpage_sanity_check(const struct lu_env *env,
1627                                      struct mdd_object *obj)
1628 {
1629         struct dt_object *next = mdd_object_child(obj);
1630         int rc;
1631         ENTRY;
1632
1633         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1634                 rc = 0;
1635         else
1636                 rc = -ENOTDIR;
1637
1638         RETURN(rc);
1639 }
1640
1641 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
1642                               int nob, const struct dt_it_ops *iops,
1643                               struct dt_it *it, __u32 attr, void *arg)
1644 {
1645         struct lu_dirpage       *dp = &lp->lp_dir;
1646         void                    *area = dp;
1647         int                      result;
1648         __u64                    hash = 0;
1649         struct lu_dirent        *ent;
1650         struct lu_dirent        *last = NULL;
1651         int                      first = 1;
1652
1653         memset(area, 0, sizeof (*dp));
1654         area += sizeof (*dp);
1655         nob  -= sizeof (*dp);
1656
1657         ent  = area;
1658         do {
1659                 int    len;
1660                 int    recsize;
1661
1662                 len = iops->key_size(env, it);
1663
1664                 /* IAM iterator can return record with zero len. */
1665                 if (len == 0)
1666                         goto next;
1667
1668                 hash = iops->store(env, it);
1669                 if (unlikely(first)) {
1670                         first = 0;
1671                         dp->ldp_hash_start = cpu_to_le64(hash);
1672                 }
1673
1674                 /* calculate max space required for lu_dirent */
1675                 recsize = lu_dirent_calc_size(len, attr);
1676
1677                 if (nob >= recsize) {
1678                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
1679                         if (result == -ESTALE)
1680                                 goto next;
1681                         if (result != 0)
1682                                 goto out;
1683
1684                         /* osd might not able to pack all attributes,
1685                          * so recheck rec length */
1686                         recsize = le16_to_cpu(ent->lde_reclen);
1687                 } else {
1688                         result = (last != NULL) ? 0 :-EINVAL;
1689                         goto out;
1690                 }
1691                 last = ent;
1692                 ent = (void *)ent + recsize;
1693                 nob -= recsize;
1694
1695 next:
1696                 result = iops->next(env, it);
1697                 if (result == -ESTALE)
1698                         goto next;
1699         } while (result == 0);
1700
1701 out:
1702         dp->ldp_hash_end = cpu_to_le64(hash);
1703         if (last != NULL) {
1704                 if (last->lde_hash == dp->ldp_hash_end)
1705                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
1706                 last->lde_reclen = 0; /* end mark */
1707         }
1708         if (result > 0)
1709                 /* end of directory */
1710                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
1711         if (result < 0)
1712                 CWARN("build page failed: %d!\n", result);
1713         return result;
1714 }
1715
1716 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1717                  const struct lu_rdpg *rdpg)
1718 {
1719         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1720         int rc;
1721         ENTRY;
1722
1723         if (mdd_object_exists(mdd_obj) == 0) {
1724                 CERROR("%s: object "DFID" not found: rc = -2\n",
1725                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1726                 return -ENOENT;
1727         }
1728
1729         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
1730         rc = mdd_readpage_sanity_check(env, mdd_obj);
1731         if (rc)
1732                 GOTO(out_unlock, rc);
1733
1734         if (mdd_is_dead_obj(mdd_obj)) {
1735                 struct page *pg;
1736                 struct lu_dirpage *dp;
1737
1738                 /*
1739                  * According to POSIX, please do not return any entry to client:
1740                  * even dot and dotdot should not be returned.
1741                  */
1742                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
1743                        PFID(mdd_object_fid(mdd_obj)));
1744
1745                 if (rdpg->rp_count <= 0)
1746                         GOTO(out_unlock, rc = -EFAULT);
1747                 LASSERT(rdpg->rp_pages != NULL);
1748
1749                 pg = rdpg->rp_pages[0];
1750                 dp = (struct lu_dirpage*)cfs_kmap(pg);
1751                 memset(dp, 0 , sizeof(struct lu_dirpage));
1752                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1753                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
1754                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1755                 cfs_kunmap(pg);
1756                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
1757         }
1758
1759         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
1760                            mdd_dir_page_build, NULL);
1761         if (rc >= 0) {
1762                 struct lu_dirpage       *dp;
1763
1764                 dp = cfs_kmap(rdpg->rp_pages[0]);
1765                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
1766                 if (rc == 0) {
1767                         /*
1768                          * No pages were processed, mark this for first page
1769                          * and send back.
1770                          */
1771                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
1772                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
1773                 }
1774                 cfs_kunmap(rdpg->rp_pages[0]);
1775         }
1776
1777         GOTO(out_unlock, rc);
1778 out_unlock:
1779         mdd_read_unlock(env, mdd_obj);
1780         return rc;
1781 }
1782
1783 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
1784 {
1785         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1786
1787         if (mdd_object_exists(mdd_obj) == 0) {
1788                 CERROR("%s: object "DFID" not found: rc = -2\n",
1789                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1790                 return -ENOENT;
1791         }
1792         return dt_object_sync(env, mdd_object_child(mdd_obj));
1793 }
1794
1795 const struct md_object_operations mdd_obj_ops = {
1796         .moo_permission    = mdd_permission,
1797         .moo_attr_get      = mdd_attr_get,
1798         .moo_attr_set      = mdd_attr_set,
1799         .moo_xattr_get     = mdd_xattr_get,
1800         .moo_xattr_set     = mdd_xattr_set,
1801         .moo_xattr_list    = mdd_xattr_list,
1802         .moo_xattr_del     = mdd_xattr_del,
1803         .moo_open          = mdd_open,
1804         .moo_close         = mdd_close,
1805         .moo_readpage      = mdd_readpage,
1806         .moo_readlink      = mdd_readlink,
1807         .moo_changelog     = mdd_changelog,
1808         .moo_capa_get      = mdd_capa_get,
1809         .moo_object_sync   = mdd_object_sync,
1810         .moo_path          = mdd_path,
1811 };