Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/mdd/mdd_object.c
39  *
40  * Lustre Metadata Server (mdd) routines
41  *
42  * Author: Wang Di <wangdi@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_MDS
49
50 #include <linux/module.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55 /* fid_be_cpu(), fid_cpu_to_be(). */
56 #include <lustre_fid.h>
57 #include <obd_lov.h>
58
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
62
63 #include "mdd_internal.h"
64
65 static const struct lu_object_operations mdd_lu_obj_ops;
66
67 static int mdd_xattr_get(const struct lu_env *env,
68                          struct md_object *obj, struct lu_buf *buf,
69                          const char *name);
70
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
72                  void **data)
73 {
74         if (mdd_object_exists(obj) == 0) {
75                 CERROR("%s: object "DFID" not found: rc = -2\n",
76                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
77                 return -ENOENT;
78         }
79         mdo_data_get(env, obj, data);
80         return 0;
81 }
82
83 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
84                struct lu_attr *la, struct lustre_capa *capa)
85 {
86         if (mdd_object_exists(obj) == 0) {
87                 CERROR("%s: object "DFID" not found: rc = -2\n",
88                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
89                 return -ENOENT;
90         }
91         return mdo_attr_get(env, obj, la, capa);
92 }
93
94 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
95 {
96         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
97
98         if (flags & LUSTRE_APPEND_FL)
99                 obj->mod_flags |= APPEND_OBJ;
100
101         if (flags & LUSTRE_IMMUTABLE_FL)
102                 obj->mod_flags |= IMMUTE_OBJ;
103 }
104
105 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
106 {
107         struct mdd_thread_info *info;
108
109         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
110         LASSERT(info != NULL);
111         return info;
112 }
113
114 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
115 {
116         struct lu_buf *buf;
117
118         buf = &mdd_env_info(env)->mti_buf;
119         buf->lb_buf = area;
120         buf->lb_len = len;
121         return buf;
122 }
123
124 void mdd_buf_put(struct lu_buf *buf)
125 {
126         if (buf == NULL || buf->lb_buf == NULL)
127                 return;
128         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
129         buf->lb_buf = NULL;
130         buf->lb_len = 0;
131 }
132
133 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
134                                        const void *area, ssize_t len)
135 {
136         struct lu_buf *buf;
137
138         buf = &mdd_env_info(env)->mti_buf;
139         buf->lb_buf = (void *)area;
140         buf->lb_len = len;
141         return buf;
142 }
143
144 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
145 {
146         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
147
148         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
149                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
150                 buf->lb_buf = NULL;
151         }
152         if (buf->lb_buf == NULL) {
153                 buf->lb_len = len;
154                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
155                 if (buf->lb_buf == NULL)
156                         buf->lb_len = 0;
157         }
158         return buf;
159 }
160
161 /** Increase the size of the \a mti_big_buf.
162  * preserves old data in buffer
163  * old buffer remains unchanged on error
164  * \retval 0 or -ENOMEM
165  */
166 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
167 {
168         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
169         struct lu_buf buf;
170
171         LASSERT(len >= oldbuf->lb_len);
172         OBD_ALLOC_LARGE(buf.lb_buf, len);
173
174         if (buf.lb_buf == NULL)
175                 return -ENOMEM;
176
177         buf.lb_len = len;
178         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
179
180         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
181
182         memcpy(oldbuf, &buf, sizeof(buf));
183
184         return 0;
185 }
186
187 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
188                                        struct mdd_device *mdd)
189 {
190         struct mdd_thread_info *mti = mdd_env_info(env);
191         int                     max_cookie_size;
192
193         max_cookie_size = mdd_lov_cookiesize(env, mdd);
194         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
195                 if (mti->mti_max_cookie)
196                         OBD_FREE_LARGE(mti->mti_max_cookie,
197                                        mti->mti_max_cookie_size);
198                 mti->mti_max_cookie = NULL;
199                 mti->mti_max_cookie_size = 0;
200         }
201         if (unlikely(mti->mti_max_cookie == NULL)) {
202                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
203                 if (likely(mti->mti_max_cookie != NULL))
204                         mti->mti_max_cookie_size = max_cookie_size;
205         }
206         if (likely(mti->mti_max_cookie != NULL))
207                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208         return mti->mti_max_cookie;
209 }
210
211 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
212                                    struct mdd_device *mdd)
213 {
214         struct mdd_thread_info *mti = mdd_env_info(env);
215         int                     max_lmm_size;
216
217         max_lmm_size = mdd_lov_mdsize(env, mdd);
218         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
219                 if (mti->mti_max_lmm)
220                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
221                 mti->mti_max_lmm = NULL;
222                 mti->mti_max_lmm_size = 0;
223         }
224         if (unlikely(mti->mti_max_lmm == NULL)) {
225                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
226                 if (likely(mti->mti_max_lmm != NULL))
227                         mti->mti_max_lmm_size = max_lmm_size;
228         }
229         return mti->mti_max_lmm;
230 }
231
232 struct lu_object *mdd_object_alloc(const struct lu_env *env,
233                                    const struct lu_object_header *hdr,
234                                    struct lu_device *d)
235 {
236         struct mdd_object *mdd_obj;
237
238         OBD_ALLOC_PTR(mdd_obj);
239         if (mdd_obj != NULL) {
240                 struct lu_object *o;
241
242                 o = mdd2lu_obj(mdd_obj);
243                 lu_object_init(o, NULL, d);
244                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
245                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
246                 mdd_obj->mod_count = 0;
247                 o->lo_ops = &mdd_lu_obj_ops;
248                 return o;
249         } else {
250                 return NULL;
251         }
252 }
253
254 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
255                            const struct lu_object_conf *unused)
256 {
257         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
258         struct mdd_object *mdd_obj = lu2mdd_obj(o);
259         struct lu_object  *below;
260         struct lu_device  *under;
261         ENTRY;
262
263         mdd_obj->mod_cltime = 0;
264         under = &d->mdd_child->dd_lu_dev;
265         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
266         mdd_pdlock_init(mdd_obj);
267         if (below == NULL)
268                 RETURN(-ENOMEM);
269
270         lu_object_add(o, below);
271
272         RETURN(0);
273 }
274
275 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
276 {
277         if (lu_object_exists(o))
278                 return mdd_get_flags(env, lu2mdd_obj(o));
279         else
280                 return 0;
281 }
282
283 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
284 {
285         struct mdd_object *mdd = lu2mdd_obj(o);
286
287         lu_object_fini(o);
288         OBD_FREE_PTR(mdd);
289 }
290
291 static int mdd_object_print(const struct lu_env *env, void *cookie,
292                             lu_printer_t p, const struct lu_object *o)
293 {
294         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
295         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
296                     "valid=%x, cltime="LPU64", flags=%lx)",
297                     mdd, mdd->mod_count, mdd->mod_valid,
298                     mdd->mod_cltime, mdd->mod_flags);
299 }
300
301 static const struct lu_object_operations mdd_lu_obj_ops = {
302         .loo_object_init    = mdd_object_init,
303         .loo_object_start   = mdd_object_start,
304         .loo_object_free    = mdd_object_free,
305         .loo_object_print   = mdd_object_print,
306 };
307
308 struct mdd_object *mdd_object_find(const struct lu_env *env,
309                                    struct mdd_device *d,
310                                    const struct lu_fid *f)
311 {
312         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
313 }
314
315 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
316                         const char *path, struct lu_fid *fid)
317 {
318         struct lu_buf *buf;
319         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
320         struct mdd_object *obj;
321         struct lu_name *lname = &mdd_env_info(env)->mti_name;
322         char *name;
323         int rc = 0;
324         ENTRY;
325
326         /* temp buffer for path element */
327         buf = mdd_buf_alloc(env, PATH_MAX);
328         if (buf->lb_buf == NULL)
329                 RETURN(-ENOMEM);
330
331         lname->ln_name = name = buf->lb_buf;
332         lname->ln_namelen = 0;
333         *f = mdd->mdd_root_fid;
334
335         while(1) {
336                 while (*path == '/')
337                         path++;
338                 if (*path == '\0')
339                         break;
340                 while (*path != '/' && *path != '\0') {
341                         *name = *path;
342                         path++;
343                         name++;
344                         lname->ln_namelen++;
345                 }
346
347                 *name = '\0';
348                 /* find obj corresponding to fid */
349                 obj = mdd_object_find(env, mdd, f);
350                 if (obj == NULL)
351                         GOTO(out, rc = -EREMOTE);
352                 if (IS_ERR(obj))
353                         GOTO(out, rc = PTR_ERR(obj));
354                 /* get child fid from parent and name */
355                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
356                 mdd_object_put(env, obj);
357                 if (rc)
358                         break;
359
360                 name = buf->lb_buf;
361                 lname->ln_namelen = 0;
362         }
363
364         if (!rc)
365                 *fid = *f;
366 out:
367         RETURN(rc);
368 }
369
370 /** The maximum depth that fid2path() will search.
371  * This is limited only because we want to store the fids for
372  * historical path lookup purposes.
373  */
374 #define MAX_PATH_DEPTH 100
375
376 /** mdd_path() lookup structure. */
377 struct path_lookup_info {
378         __u64                pli_recno;        /**< history point */
379         __u64                pli_currec;       /**< current record */
380         struct lu_fid        pli_fid;
381         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
382         struct mdd_object   *pli_mdd_obj;
383         char                *pli_path;         /**< full path */
384         int                  pli_pathlen;
385         int                  pli_linkno;       /**< which hardlink to follow */
386         int                  pli_fidcount;     /**< number of \a pli_fids */
387 };
388
389 static int mdd_path_current(const struct lu_env *env,
390                             struct path_lookup_info *pli)
391 {
392         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
393         struct mdd_object *mdd_obj;
394         struct lu_buf     *buf = NULL;
395         struct link_ea_header *leh;
396         struct link_ea_entry  *lee;
397         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
398         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
399         char *ptr;
400         int reclen;
401         int rc;
402         ENTRY;
403
404         ptr = pli->pli_path + pli->pli_pathlen - 1;
405         *ptr = 0;
406         --ptr;
407         pli->pli_fidcount = 0;
408         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
409
410         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
411                 mdd_obj = mdd_object_find(env, mdd,
412                                           &pli->pli_fids[pli->pli_fidcount]);
413                 if (mdd_obj == NULL)
414                         GOTO(out, rc = -EREMOTE);
415                 if (IS_ERR(mdd_obj))
416                         GOTO(out, rc = PTR_ERR(mdd_obj));
417                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
418                 if (rc <= 0) {
419                         mdd_object_put(env, mdd_obj);
420                         if (rc == -1)
421                                 rc = -EREMOTE;
422                         else if (rc == 0)
423                                 /* Do I need to error out here? */
424                                 rc = -ENOENT;
425                         GOTO(out, rc);
426                 }
427
428                 /* Get parent fid and object name */
429                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
430                 buf = mdd_links_get(env, mdd_obj);
431                 mdd_read_unlock(env, mdd_obj);
432                 mdd_object_put(env, mdd_obj);
433                 if (IS_ERR(buf))
434                         GOTO(out, rc = PTR_ERR(buf));
435
436                 leh = buf->lb_buf;
437                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
438                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
439
440                 /* If set, use link #linkno for path lookup, otherwise use
441                    link #0.  Only do this for the final path element. */
442                 if ((pli->pli_fidcount == 0) &&
443                     (pli->pli_linkno < leh->leh_reccount)) {
444                         int count;
445                         for (count = 0; count < pli->pli_linkno; count++) {
446                                 lee = (struct link_ea_entry *)
447                                      ((char *)lee + reclen);
448                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
449                         }
450                         if (pli->pli_linkno < leh->leh_reccount - 1)
451                                 /* indicate to user there are more links */
452                                 pli->pli_linkno++;
453                 }
454
455                 /* Pack the name in the end of the buffer */
456                 ptr -= tmpname->ln_namelen;
457                 if (ptr - 1 <= pli->pli_path)
458                         GOTO(out, rc = -EOVERFLOW);
459                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
460                 *(--ptr) = '/';
461
462                 /* Store the parent fid for historic lookup */
463                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
464                         GOTO(out, rc = -EOVERFLOW);
465                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
466         }
467
468         /* Verify that our path hasn't changed since we started the lookup.
469            Record the current index, and verify the path resolves to the
470            same fid. If it does, then the path is correct as of this index. */
471         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
472         pli->pli_currec = mdd->mdd_cl.mc_index;
473         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
474         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
475         if (rc) {
476                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
477                 GOTO (out, rc = -EAGAIN);
478         }
479         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
480                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
481                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
482                        PFID(&pli->pli_fid));
483                 GOTO(out, rc = -EAGAIN);
484         }
485         ptr++; /* skip leading / */
486         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
487
488         EXIT;
489 out:
490         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
491                 /* if we vmalloced a large buffer drop it */
492                 mdd_buf_put(buf);
493
494         return rc;
495 }
496
497 static int mdd_path_historic(const struct lu_env *env,
498                              struct path_lookup_info *pli)
499 {
500         return 0;
501 }
502
503 /* Returns the full path to this fid, as of changelog record recno. */
504 static int mdd_path(const struct lu_env *env, struct md_object *obj,
505                     char *path, int pathlen, __u64 *recno, int *linkno)
506 {
507         struct path_lookup_info *pli;
508         int tries = 3;
509         int rc = -EAGAIN;
510         ENTRY;
511
512         if (pathlen < 3)
513                 RETURN(-EOVERFLOW);
514
515         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
516                 path[0] = '\0';
517                 RETURN(0);
518         }
519
520         OBD_ALLOC_PTR(pli);
521         if (pli == NULL)
522                 RETURN(-ENOMEM);
523
524         pli->pli_mdd_obj = md2mdd_obj(obj);
525         pli->pli_recno = *recno;
526         pli->pli_path = path;
527         pli->pli_pathlen = pathlen;
528         pli->pli_linkno = *linkno;
529
530         /* Retry multiple times in case file is being moved */
531         while (tries-- && rc == -EAGAIN)
532                 rc = mdd_path_current(env, pli);
533
534         /* For historical path lookup, the current links may not have existed
535          * at "recno" time.  We must switch over to earlier links/parents
536          * by using the changelog records.  If the earlier parent doesn't
537          * exist, we must search back through the changelog to reconstruct
538          * its parents, then check if it exists, etc.
539          * We may ignore this problem for the initial implementation and
540          * state that an "original" hardlink must still exist for us to find
541          * historic path name. */
542         if (pli->pli_recno != -1) {
543                 rc = mdd_path_historic(env, pli);
544         } else {
545                 *recno = pli->pli_currec;
546                 /* Return next link index to caller */
547                 *linkno = pli->pli_linkno;
548         }
549
550         OBD_FREE_PTR(pli);
551
552         RETURN (rc);
553 }
554
555 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
556 {
557         struct lu_attr *la = &mdd_env_info(env)->mti_la;
558         int rc;
559
560         ENTRY;
561         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
562         if (rc == 0) {
563                 mdd_flags_xlate(obj, la->la_flags);
564         }
565         RETURN(rc);
566 }
567
568 /* get only inode attributes */
569 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
570                   struct md_attr *ma)
571 {
572         int rc = 0;
573         ENTRY;
574
575         if (ma->ma_valid & MA_INODE)
576                 RETURN(0);
577
578         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
579                           mdd_object_capa(env, mdd_obj));
580         if (rc == 0)
581                 ma->ma_valid |= MA_INODE;
582         RETURN(rc);
583 }
584
585 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
586 {
587         struct lov_desc *ldesc;
588         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
589         struct lov_user_md *lum = (struct lov_user_md*)lmm;
590         ENTRY;
591
592         if (!lum)
593                 RETURN(0);
594
595         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
596         LASSERT(ldesc != NULL);
597
598         lum->lmm_magic = LOV_MAGIC_V1;
599         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
600         lum->lmm_pattern = ldesc->ld_pattern;
601         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
602         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
603         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
604
605         RETURN(sizeof(*lum));
606 }
607
608 static int is_rootdir(struct mdd_object *mdd_obj)
609 {
610         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
611         const struct lu_fid *fid = mdo2fid(mdd_obj);
612
613         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
614 }
615
616 /* get lov EA only */
617 static int __mdd_lmm_get(const struct lu_env *env,
618                          struct mdd_object *mdd_obj, struct md_attr *ma)
619 {
620         int rc;
621         ENTRY;
622
623         if (ma->ma_valid & MA_LOV)
624                 RETURN(0);
625
626         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
627                         XATTR_NAME_LOV);
628         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
629                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
630         if (rc > 0) {
631                 ma->ma_lmm_size = rc;
632                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
633                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
634                 rc = 0;
635         }
636         RETURN(rc);
637 }
638
639 /* get the first parent fid from link EA */
640 static int mdd_pfid_get(const struct lu_env *env,
641                         struct mdd_object *mdd_obj, struct md_attr *ma)
642 {
643         struct lu_buf *buf;
644         struct link_ea_header *leh;
645         struct link_ea_entry *lee;
646         struct lu_fid *pfid = &ma->ma_pfid;
647         ENTRY;
648
649         if (ma->ma_valid & MA_PFID)
650                 RETURN(0);
651
652         buf = mdd_links_get(env, mdd_obj);
653         if (IS_ERR(buf))
654                 RETURN(PTR_ERR(buf));
655
656         leh = buf->lb_buf;
657         lee = (struct link_ea_entry *)(leh + 1);
658         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
659         fid_be_to_cpu(pfid, pfid);
660         ma->ma_valid |= MA_PFID;
661         if (buf->lb_len > OBD_ALLOC_BIG)
662                 /* if we vmalloced a large buffer drop it */
663                 mdd_buf_put(buf);
664         RETURN(0);
665 }
666
667 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
668                        struct md_attr *ma)
669 {
670         int rc;
671         ENTRY;
672
673         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
674         rc = __mdd_lmm_get(env, mdd_obj, ma);
675         mdd_read_unlock(env, mdd_obj);
676         RETURN(rc);
677 }
678
679 /* get lmv EA only*/
680 static int __mdd_lmv_get(const struct lu_env *env,
681                          struct mdd_object *mdd_obj, struct md_attr *ma)
682 {
683         int rc;
684         ENTRY;
685
686         if (ma->ma_valid & MA_LMV)
687                 RETURN(0);
688
689         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
690                         XATTR_NAME_LMV);
691         if (rc > 0) {
692                 ma->ma_valid |= MA_LMV;
693                 rc = 0;
694         }
695         RETURN(rc);
696 }
697
698 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
699                          struct md_attr *ma)
700 {
701         struct mdd_thread_info *info = mdd_env_info(env);
702         struct lustre_mdt_attrs *lma =
703                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
704         int lma_size;
705         int rc;
706         ENTRY;
707
708         /* If all needed data are already valid, nothing to do */
709         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
710             (ma->ma_need & (MA_HSM | MA_SOM)))
711                 RETURN(0);
712
713         /* Read LMA from disk EA */
714         lma_size = sizeof(info->mti_xattr_buf);
715         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
716         if (rc <= 0)
717                 RETURN(rc);
718
719         /* Useless to check LMA incompatibility because this is already done in
720          * osd_ea_fid_get(), and this will fail long before this code is
721          * called.
722          * So, if we are here, LMA is compatible.
723          */
724
725         lustre_lma_swab(lma);
726
727         /* Swab and copy LMA */
728         if (ma->ma_need & MA_HSM) {
729                 if (lma->lma_compat & LMAC_HSM)
730                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
731                 else
732                         ma->ma_hsm.mh_flags = 0;
733                 ma->ma_valid |= MA_HSM;
734         }
735
736         /* Copy SOM */
737         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
738                 LASSERT(ma->ma_som != NULL);
739                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
740                 ma->ma_som->msd_size    = lma->lma_som_size;
741                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
742                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
743                 ma->ma_valid |= MA_SOM;
744         }
745
746         RETURN(0);
747 }
748
749 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
750                                  struct md_attr *ma)
751 {
752         int rc = 0;
753         ENTRY;
754
755         if (ma->ma_need & MA_INODE)
756                 rc = mdd_iattr_get(env, mdd_obj, ma);
757
758         if (rc == 0 && ma->ma_need & MA_LOV) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
760                     S_ISDIR(mdd_object_type(mdd_obj)))
761                         rc = __mdd_lmm_get(env, mdd_obj, ma);
762         }
763         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
764                 if (S_ISREG(mdd_object_type(mdd_obj)))
765                         rc = mdd_pfid_get(env, mdd_obj, ma);
766         }
767         if (rc == 0 && ma->ma_need & MA_LMV) {
768                 if (S_ISDIR(mdd_object_type(mdd_obj)))
769                         rc = __mdd_lmv_get(env, mdd_obj, ma);
770         }
771         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
772                 if (S_ISREG(mdd_object_type(mdd_obj)))
773                         rc = __mdd_lma_get(env, mdd_obj, ma);
774         }
775 #ifdef CONFIG_FS_POSIX_ACL
776         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
777                 if (S_ISDIR(mdd_object_type(mdd_obj)))
778                         rc = mdd_def_acl_get(env, mdd_obj, ma);
779         }
780 #endif
781         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
782                rc, ma->ma_valid, ma->ma_lmm);
783         RETURN(rc);
784 }
785
786 int mdd_attr_get_internal_locked(const struct lu_env *env,
787                                  struct mdd_object *mdd_obj, struct md_attr *ma)
788 {
789         int rc;
790         int needlock = ma->ma_need &
791                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
792
793         if (needlock)
794                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
795         rc = mdd_attr_get_internal(env, mdd_obj, ma);
796         if (needlock)
797                 mdd_read_unlock(env, mdd_obj);
798         return rc;
799 }
800
801 /*
802  * No permission check is needed.
803  */
804 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
805                         struct md_attr *ma)
806 {
807         struct mdd_object *mdd_obj = md2mdd_obj(obj);
808         int                rc;
809
810         ENTRY;
811         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
812         RETURN(rc);
813 }
814
815 /*
816  * No permission check is needed.
817  */
818 static int mdd_xattr_get(const struct lu_env *env,
819                          struct md_object *obj, struct lu_buf *buf,
820                          const char *name)
821 {
822         struct mdd_object *mdd_obj = md2mdd_obj(obj);
823         int rc;
824
825         ENTRY;
826
827         if (mdd_object_exists(mdd_obj) == 0) {
828                 CERROR("%s: object "DFID" not found: rc = -2\n",
829                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
830                 return -ENOENT;
831         }
832
833         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834         rc = mdo_xattr_get(env, mdd_obj, buf, name,
835                            mdd_object_capa(env, mdd_obj));
836         mdd_read_unlock(env, mdd_obj);
837
838         RETURN(rc);
839 }
840
841 /*
842  * Permission check is done when open,
843  * no need check again.
844  */
845 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
846                         struct lu_buf *buf)
847 {
848         struct mdd_object *mdd_obj = md2mdd_obj(obj);
849         struct dt_object  *next;
850         loff_t             pos = 0;
851         int                rc;
852         ENTRY;
853
854         if (mdd_object_exists(mdd_obj) == 0) {
855                 CERROR("%s: object "DFID" not found: rc = -2\n",
856                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
857                 return -ENOENT;
858         }
859
860         next = mdd_object_child(mdd_obj);
861         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
862         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
863                                          mdd_object_capa(env, mdd_obj));
864         mdd_read_unlock(env, mdd_obj);
865         RETURN(rc);
866 }
867
868 /*
869  * No permission check is needed.
870  */
871 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
872                           struct lu_buf *buf)
873 {
874         struct mdd_object *mdd_obj = md2mdd_obj(obj);
875         int rc;
876
877         ENTRY;
878
879         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
880         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
881         mdd_read_unlock(env, mdd_obj);
882
883         RETURN(rc);
884 }
885
886 int mdd_declare_object_create_internal(const struct lu_env *env,
887                                        struct mdd_object *p,
888                                        struct mdd_object *c,
889                                        struct md_attr *ma,
890                                        struct thandle *handle,
891                                        const struct md_op_spec *spec)
892 {
893         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
894         const struct dt_index_features *feat = spec->sp_feat;
895         int rc;
896         ENTRY;
897
898         if (feat != &dt_directory_features && feat != NULL)
899                 dof->dof_type = DFT_INDEX;
900         else
901                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
902
903         dof->u.dof_idx.di_feat = feat;
904
905         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
906
907         RETURN(rc);
908 }
909
910 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
911                                struct mdd_object *c, struct md_attr *ma,
912                                struct thandle *handle,
913                                const struct md_op_spec *spec)
914 {
915         struct lu_attr *attr = &ma->ma_attr;
916         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
917         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
918         const struct dt_index_features *feat = spec->sp_feat;
919         int rc;
920         ENTRY;
921
922         if (!mdd_object_exists(c)) {
923                 struct dt_object *next = mdd_object_child(c);
924                 LASSERT(next);
925
926                 if (feat != &dt_directory_features && feat != NULL)
927                         dof->dof_type = DFT_INDEX;
928                 else
929                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
930
931                 dof->u.dof_idx.di_feat = feat;
932
933                 /* @hint will be initialized by underlying device. */
934                 next->do_ops->do_ah_init(env, hint,
935                                          p ? mdd_object_child(p) : NULL,
936                                          attr->la_mode & S_IFMT);
937
938                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
939                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
940         } else
941                 rc = -EEXIST;
942
943         RETURN(rc);
944 }
945
946 /**
947  * Make sure the ctime is increased only.
948  */
949 static inline int mdd_attr_check(const struct lu_env *env,
950                                  struct mdd_object *obj,
951                                  struct lu_attr *attr)
952 {
953         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
954         int rc;
955         ENTRY;
956
957         if (attr->la_valid & LA_CTIME) {
958                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
959                 if (rc)
960                         RETURN(rc);
961
962                 if (attr->la_ctime < tmp_la->la_ctime)
963                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
964                 else if (attr->la_valid == LA_CTIME &&
965                          attr->la_ctime == tmp_la->la_ctime)
966                         attr->la_valid &= ~LA_CTIME;
967         }
968         RETURN(0);
969 }
970
971 int mdd_attr_set_internal(const struct lu_env *env,
972                           struct mdd_object *obj,
973                           struct lu_attr *attr,
974                           struct thandle *handle,
975                           int needacl)
976 {
977         int rc;
978         ENTRY;
979
980         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
981 #ifdef CONFIG_FS_POSIX_ACL
982         if (!rc && (attr->la_valid & LA_MODE) && needacl)
983                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
984 #endif
985         RETURN(rc);
986 }
987
988 int mdd_attr_check_set_internal(const struct lu_env *env,
989                                 struct mdd_object *obj,
990                                 struct lu_attr *attr,
991                                 struct thandle *handle,
992                                 int needacl)
993 {
994         int rc;
995         ENTRY;
996
997         rc = mdd_attr_check(env, obj, attr);
998         if (rc)
999                 RETURN(rc);
1000
1001         if (attr->la_valid)
1002                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1003         RETURN(rc);
1004 }
1005
1006 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1007                                         struct mdd_object *obj,
1008                                         struct lu_attr *attr,
1009                                         struct thandle *handle,
1010                                         int needacl)
1011 {
1012         int rc;
1013         ENTRY;
1014
1015         needacl = needacl && (attr->la_valid & LA_MODE);
1016         if (needacl)
1017                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1018         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1019         if (needacl)
1020                 mdd_write_unlock(env, obj);
1021         RETURN(rc);
1022 }
1023
1024 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1025                                        struct mdd_object *obj,
1026                                        struct lu_attr *attr,
1027                                        struct thandle *handle,
1028                                        int needacl)
1029 {
1030         int rc;
1031         ENTRY;
1032
1033         needacl = needacl && (attr->la_valid & LA_MODE);
1034         if (needacl)
1035                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1036         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1037         if (needacl)
1038                 mdd_write_unlock(env, obj);
1039         RETURN(rc);
1040 }
1041
1042 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1043                     const struct lu_buf *buf, const char *name,
1044                     int fl, struct thandle *handle)
1045 {
1046         struct lustre_capa *capa = mdd_object_capa(env, obj);
1047         int rc = -EINVAL;
1048         ENTRY;
1049
1050         if (buf->lb_buf && buf->lb_len > 0)
1051                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1052         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1053                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1054
1055         RETURN(rc);
1056 }
1057
1058 /*
1059  * This gives the same functionality as the code between
1060  * sys_chmod and inode_setattr
1061  * chown_common and inode_setattr
1062  * utimes and inode_setattr
1063  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1064  */
1065 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1066                         struct lu_attr *la, const struct md_attr *ma)
1067 {
1068         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1069         struct md_ucred  *uc;
1070         int               rc;
1071         ENTRY;
1072
1073         if (!la->la_valid)
1074                 RETURN(0);
1075
1076         /* Do not permit change file type */
1077         if (la->la_valid & LA_TYPE)
1078                 RETURN(-EPERM);
1079
1080         /* They should not be processed by setattr */
1081         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1082                 RETURN(-EPERM);
1083
1084         /* export destroy does not have ->le_ses, but we may want
1085          * to drop LUSTRE_SOM_FL. */
1086         if (!env->le_ses)
1087                 RETURN(0);
1088
1089         uc = md_ucred(env);
1090
1091         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1092         if (rc)
1093                 RETURN(rc);
1094
1095         if (la->la_valid == LA_CTIME) {
1096                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1097                         /* This is only for set ctime when rename's source is
1098                          * on remote MDS. */
1099                         rc = mdd_may_delete(env, NULL, obj,
1100                                             (struct md_attr *)ma, 1, 0);
1101                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1102                         la->la_valid &= ~LA_CTIME;
1103                 RETURN(rc);
1104         }
1105
1106         if (la->la_valid == LA_ATIME) {
1107                 /* This is atime only set for read atime update on close. */
1108                 if (la->la_atime >= tmp_la->la_atime &&
1109                     la->la_atime < (tmp_la->la_atime +
1110                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1111                         la->la_valid &= ~LA_ATIME;
1112                 RETURN(0);
1113         }
1114
1115         /* Check if flags change. */
1116         if (la->la_valid & LA_FLAGS) {
1117                 unsigned int oldflags = 0;
1118                 unsigned int newflags = la->la_flags &
1119                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1120
1121                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1122                     !mdd_capable(uc, CFS_CAP_FOWNER))
1123                         RETURN(-EPERM);
1124
1125                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1126                  * only be changed by the relevant capability. */
1127                 if (mdd_is_immutable(obj))
1128                         oldflags |= LUSTRE_IMMUTABLE_FL;
1129                 if (mdd_is_append(obj))
1130                         oldflags |= LUSTRE_APPEND_FL;
1131                 if ((oldflags ^ newflags) &&
1132                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1133                         RETURN(-EPERM);
1134
1135                 if (!S_ISDIR(tmp_la->la_mode))
1136                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1137         }
1138
1139         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1140             (la->la_valid & ~LA_FLAGS) &&
1141             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1142                 RETURN(-EPERM);
1143
1144         /* Check for setting the obj time. */
1145         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1146             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1147                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1148                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1149                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1150                                                             MAY_WRITE,
1151                                                             MOR_TGT_CHILD);
1152                         if (rc)
1153                                 RETURN(rc);
1154                 }
1155         }
1156
1157         if (la->la_valid & LA_KILL_SUID) {
1158                 la->la_valid &= ~LA_KILL_SUID;
1159                 if ((tmp_la->la_mode & S_ISUID) &&
1160                     !(la->la_valid & LA_MODE)) {
1161                         la->la_mode = tmp_la->la_mode;
1162                         la->la_valid |= LA_MODE;
1163                 }
1164                 la->la_mode &= ~S_ISUID;
1165         }
1166
1167         if (la->la_valid & LA_KILL_SGID) {
1168                 la->la_valid &= ~LA_KILL_SGID;
1169                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1170                                         (S_ISGID | S_IXGRP)) &&
1171                     !(la->la_valid & LA_MODE)) {
1172                         la->la_mode = tmp_la->la_mode;
1173                         la->la_valid |= LA_MODE;
1174                 }
1175                 la->la_mode &= ~S_ISGID;
1176         }
1177
1178         /* Make sure a caller can chmod. */
1179         if (la->la_valid & LA_MODE) {
1180                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1181                     (uc->mu_fsuid != tmp_la->la_uid) &&
1182                     !mdd_capable(uc, CFS_CAP_FOWNER))
1183                         RETURN(-EPERM);
1184
1185                 if (la->la_mode == (cfs_umode_t) -1)
1186                         la->la_mode = tmp_la->la_mode;
1187                 else
1188                         la->la_mode = (la->la_mode & S_IALLUGO) |
1189                                       (tmp_la->la_mode & ~S_IALLUGO);
1190
1191                 /* Also check the setgid bit! */
1192                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1193                                        la->la_gid : tmp_la->la_gid) &&
1194                     !mdd_capable(uc, CFS_CAP_FSETID))
1195                         la->la_mode &= ~S_ISGID;
1196         } else {
1197                la->la_mode = tmp_la->la_mode;
1198         }
1199
1200         /* Make sure a caller can chown. */
1201         if (la->la_valid & LA_UID) {
1202                 if (la->la_uid == (uid_t) -1)
1203                         la->la_uid = tmp_la->la_uid;
1204                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1205                     (la->la_uid != tmp_la->la_uid)) &&
1206                     !mdd_capable(uc, CFS_CAP_CHOWN))
1207                         RETURN(-EPERM);
1208
1209                 /* If the user or group of a non-directory has been
1210                  * changed by a non-root user, remove the setuid bit.
1211                  * 19981026 David C Niemi <niemi@tux.org>
1212                  *
1213                  * Changed this to apply to all users, including root,
1214                  * to avoid some races. This is the behavior we had in
1215                  * 2.0. The check for non-root was definitely wrong
1216                  * for 2.2 anyway, as it should have been using
1217                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1218                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1219                     !S_ISDIR(tmp_la->la_mode)) {
1220                         la->la_mode &= ~S_ISUID;
1221                         la->la_valid |= LA_MODE;
1222                 }
1223         }
1224
1225         /* Make sure caller can chgrp. */
1226         if (la->la_valid & LA_GID) {
1227                 if (la->la_gid == (gid_t) -1)
1228                         la->la_gid = tmp_la->la_gid;
1229                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1230                     ((la->la_gid != tmp_la->la_gid) &&
1231                     !lustre_in_group_p(uc, la->la_gid))) &&
1232                     !mdd_capable(uc, CFS_CAP_CHOWN))
1233                         RETURN(-EPERM);
1234
1235                 /* Likewise, if the user or group of a non-directory
1236                  * has been changed by a non-root user, remove the
1237                  * setgid bit UNLESS there is no group execute bit
1238                  * (this would be a file marked for mandatory
1239                  * locking).  19981026 David C Niemi <niemi@tux.org>
1240                  *
1241                  * Removed the fsuid check (see the comment above) --
1242                  * 19990830 SD. */
1243                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1244                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1245                         la->la_mode &= ~S_ISGID;
1246                         la->la_valid |= LA_MODE;
1247                 }
1248         }
1249
1250         /* For both Size-on-MDS case and truncate case,
1251          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1252          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1253          * For SOM case, it is true, the MAY_WRITE perm has been checked
1254          * when open, no need check again. For truncate case, it is false,
1255          * the MAY_WRITE perm should be checked here. */
1256         if (ma->ma_attr_flags & MDS_SOM) {
1257                 /* For the "Size-on-MDS" setattr update, merge coming
1258                  * attributes with the set in the inode. BUG 10641 */
1259                 if ((la->la_valid & LA_ATIME) &&
1260                     (la->la_atime <= tmp_la->la_atime))
1261                         la->la_valid &= ~LA_ATIME;
1262
1263                 /* OST attributes do not have a priority over MDS attributes,
1264                  * so drop times if ctime is equal. */
1265                 if ((la->la_valid & LA_CTIME) &&
1266                     (la->la_ctime <= tmp_la->la_ctime))
1267                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1268         } else {
1269                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1270                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1271                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1272                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1273                                 rc = mdd_permission_internal_locked(env, obj,
1274                                                             tmp_la, MAY_WRITE,
1275                                                             MOR_TGT_CHILD);
1276                                 if (rc)
1277                                         RETURN(rc);
1278                         }
1279                 }
1280                 if (la->la_valid & LA_CTIME) {
1281                         /* The pure setattr, it has the priority over what is
1282                          * already set, do not drop it if ctime is equal. */
1283                         if (la->la_ctime < tmp_la->la_ctime)
1284                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1285                                                   LA_CTIME);
1286                 }
1287         }
1288
1289         RETURN(0);
1290 }
1291
1292 /** Store a data change changelog record
1293  * If this fails, we must fail the whole transaction; we don't
1294  * want the change to commit without the log entry.
1295  * \param mdd_obj - mdd_object of change
1296  * \param handle - transacion handle
1297  */
1298 static int mdd_changelog_data_store(const struct lu_env     *env,
1299                                     struct mdd_device       *mdd,
1300                                     enum changelog_rec_type type,
1301                                     int                     flags,
1302                                     struct mdd_object       *mdd_obj,
1303                                     struct thandle          *handle)
1304 {
1305         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1306         struct llog_changelog_rec *rec;
1307         struct thandle *th = NULL;
1308         struct lu_buf *buf;
1309         int reclen;
1310         int rc;
1311
1312         /* Not recording */
1313         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1314                 RETURN(0);
1315         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1316                 RETURN(0);
1317
1318         LASSERT(mdd_obj != NULL);
1319         LASSERT(handle != NULL);
1320
1321         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1322             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1323                 /* Don't need multiple updates in this log */
1324                 /* Don't check under lock - no big deal if we get an extra
1325                    entry */
1326                 RETURN(0);
1327         }
1328
1329         reclen = llog_data_len(sizeof(*rec));
1330         buf = mdd_buf_alloc(env, reclen);
1331         if (buf->lb_buf == NULL)
1332                 RETURN(-ENOMEM);
1333         rec = (struct llog_changelog_rec *)buf->lb_buf;
1334
1335         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1336         rec->cr.cr_type = (__u32)type;
1337         rec->cr.cr_tfid = *tfid;
1338         rec->cr.cr_namelen = 0;
1339         mdd_obj->mod_cltime = cfs_time_current_64();
1340
1341         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1342
1343         if (th)
1344                 mdd_trans_stop(env, mdd, rc, th);
1345
1346         if (rc < 0) {
1347                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1348                        rc, type, PFID(tfid));
1349                 return -EFAULT;
1350         }
1351
1352         return 0;
1353 }
1354
1355 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1356                   int flags, struct md_object *obj)
1357 {
1358         struct thandle *handle;
1359         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1360         struct mdd_device *mdd = mdo2mdd(obj);
1361         int rc;
1362         ENTRY;
1363
1364         handle = mdd_trans_create(env, mdd);
1365         if (IS_ERR(handle))
1366                 return(PTR_ERR(handle));
1367
1368         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1369         if (rc)
1370                 GOTO(stop, rc);
1371
1372         rc = mdd_trans_start(env, mdd, handle);
1373         if (rc)
1374                 GOTO(stop, rc);
1375
1376         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1377                                       handle);
1378
1379 stop:
1380         mdd_trans_stop(env, mdd, rc, handle);
1381
1382         RETURN(rc);
1383 }
1384
1385 /**
1386  * Should be called with write lock held.
1387  *
1388  * \see mdd_lma_set_locked().
1389  */
1390 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1391                        const struct md_attr *ma, struct thandle *handle)
1392 {
1393         struct mdd_thread_info *info = mdd_env_info(env);
1394         struct lu_buf *buf;
1395         struct lustre_mdt_attrs *lma =
1396                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1397         int lmasize = sizeof(struct lustre_mdt_attrs);
1398         int rc = 0;
1399
1400         ENTRY;
1401
1402         /* Either HSM or SOM part is not valid, we need to read it before */
1403         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1404                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1405                 if (rc <= 0)
1406                         RETURN(rc);
1407
1408                 lustre_lma_swab(lma);
1409         } else {
1410                 memset(lma, 0, lmasize);
1411         }
1412
1413         /* Copy HSM data */
1414         if (ma->ma_valid & MA_HSM) {
1415                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1416                 lma->lma_compat |= LMAC_HSM;
1417         }
1418
1419         /* Copy SOM data */
1420         if (ma->ma_valid & MA_SOM) {
1421                 LASSERT(ma->ma_som != NULL);
1422                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1423                         lma->lma_compat     &= ~LMAC_SOM;
1424                 } else {
1425                         lma->lma_compat     |= LMAC_SOM;
1426                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1427                         lma->lma_som_size    = ma->ma_som->msd_size;
1428                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1429                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1430                 }
1431         }
1432
1433         /* Copy FID */
1434         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1435
1436         lustre_lma_swab(lma);
1437         buf = mdd_buf_get(env, lma, lmasize);
1438         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1439
1440         RETURN(rc);
1441 }
1442
1443 /**
1444  * Save LMA extended attributes with data from \a ma.
1445  *
1446  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1447  * not, LMA EA will be first read from disk, modified and write back.
1448  *
1449  */
1450 static int mdd_lma_set_locked(const struct lu_env *env,
1451                               struct mdd_object *mdd_obj,
1452                               const struct md_attr *ma, struct thandle *handle)
1453 {
1454         int rc;
1455
1456         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1457         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1458         mdd_write_unlock(env, mdd_obj);
1459         return rc;
1460 }
1461
1462 /* Precedence for choosing record type when multiple
1463  * attributes change: setattr > mtime > ctime > atime
1464  * (ctime changes when mtime does, plus chmod/chown.
1465  * atime and ctime are independent.) */
1466 static int mdd_attr_set_changelog(const struct lu_env *env,
1467                                   struct md_object *obj, struct thandle *handle,
1468                                   __u64 valid)
1469 {
1470         struct mdd_device *mdd = mdo2mdd(obj);
1471         int bits, type = 0;
1472
1473         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1474         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1475         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1476         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1477         bits = bits & mdd->mdd_cl.mc_mask;
1478         if (bits == 0)
1479                 return 0;
1480
1481         /* The record type is the lowest non-masked set bit */
1482         while (bits && ((bits & 1) == 0)) {
1483                 bits = bits >> 1;
1484                 type++;
1485         }
1486
1487         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1488         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1489                                         md2mdd_obj(obj), handle);
1490 }
1491
1492 static int mdd_declare_attr_set(const struct lu_env *env,
1493                                 struct mdd_device *mdd,
1494                                 struct mdd_object *obj,
1495                                 const struct md_attr *ma,
1496                                 struct lov_mds_md *lmm,
1497                                 struct thandle *handle)
1498 {
1499         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1500         int             rc, i;
1501
1502         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1503         if (rc)
1504                 return rc;
1505
1506         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1507         if (rc)
1508                 return rc;
1509
1510         if (ma->ma_valid & MA_LOV) {
1511                 buf->lb_buf = NULL;
1512                 buf->lb_len = ma->ma_lmm_size;
1513                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1514                                            0, handle);
1515                 if (rc)
1516                         return rc;
1517         }
1518
1519         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1520                 buf->lb_buf = NULL;
1521                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1522                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1523                                            0, handle);
1524                 if (rc)
1525                         return rc;
1526         }
1527
1528 #ifdef CONFIG_FS_POSIX_ACL
1529         if (ma->ma_attr.la_valid & LA_MODE) {
1530                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1531                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1532                                    BYPASS_CAPA);
1533                 mdd_read_unlock(env, obj);
1534                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1535                         rc = 0;
1536                 else if (rc < 0)
1537                         return rc;
1538
1539                 if (rc != 0) {
1540                         buf->lb_buf = NULL;
1541                         buf->lb_len = rc;
1542                         rc = mdo_declare_xattr_set(env, obj, buf,
1543                                                    XATTR_NAME_ACL_ACCESS, 0,
1544                                                    handle);
1545                         if (rc)
1546                                 return rc;
1547                 }
1548         }
1549 #endif
1550
1551         /* basically the log is the same as in unlink case */
1552         if (lmm) {
1553                 __u16 stripe;
1554
1555                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1556                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1557                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1558                                mdd->mdd_obd_dev->obd_name,
1559                                le32_to_cpu(lmm->lmm_magic),
1560                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1561                         return -EINVAL;
1562                 }
1563
1564                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1565                 if (stripe == LOV_ALL_STRIPES) {
1566                         struct lov_desc *ldesc;
1567
1568                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1569                         LASSERT(ldesc != NULL);
1570                         stripe = ldesc->ld_tgt_count;
1571                 }
1572
1573                 for (i = 0; i < stripe; i++) {
1574                         rc = mdd_declare_llog_record(env, mdd,
1575                                         sizeof(struct llog_unlink_rec),
1576                                         handle);
1577                         if (rc)
1578                                 return rc;
1579                 }
1580         }
1581
1582         return rc;
1583 }
1584
1585 /* set attr and LOV EA at once, return updated attr */
1586 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1587                         const struct md_attr *ma)
1588 {
1589         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590         struct mdd_device *mdd = mdo2mdd(obj);
1591         struct thandle *handle;
1592         struct lov_mds_md *lmm = NULL;
1593         struct llog_cookie *logcookies = NULL;
1594         int  rc, lmm_size = 0, cookie_size = 0;
1595         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1596         struct obd_device *obd = mdd->mdd_obd_dev;
1597         struct mds_obd *mds = &obd->u.mds;
1598 #ifdef HAVE_QUOTA_SUPPORT
1599         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1600         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1601         int quota_opc = 0, block_count = 0;
1602         int inode_pending[MAXQUOTAS] = { 0, 0 };
1603         int block_pending[MAXQUOTAS] = { 0, 0 };
1604 #endif
1605         ENTRY;
1606
1607         *la_copy = ma->ma_attr;
1608         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1609         if (rc != 0)
1610                 RETURN(rc);
1611
1612         /* setattr on "close" only change atime, or do nothing */
1613         if (ma->ma_valid == MA_INODE &&
1614             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1615                 RETURN(0);
1616
1617         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1618             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1619                 lmm_size = mdd_lov_mdsize(env, mdd);
1620                 lmm = mdd_max_lmm_get(env, mdd);
1621                 if (lmm == NULL)
1622                         RETURN(-ENOMEM);
1623
1624                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1625                                 XATTR_NAME_LOV);
1626
1627                 if (rc < 0)
1628                         RETURN(rc);
1629         }
1630
1631         handle = mdd_trans_create(env, mdd);
1632         if (IS_ERR(handle))
1633                 RETURN(PTR_ERR(handle));
1634
1635         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1636                                   lmm_size > 0 ? lmm : NULL, handle);
1637         if (rc)
1638                 GOTO(stop, rc);
1639
1640         /* permission changes may require sync operation */
1641         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1642                 handle->th_sync = !!mdd->mdd_sync_permission;
1643
1644         rc = mdd_trans_start(env, mdd, handle);
1645         if (rc)
1646                 GOTO(stop, rc);
1647
1648         /* permission changes may require sync operation */
1649         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1650                 handle->th_sync |= mdd->mdd_sync_permission;
1651
1652         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1653                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1654                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1655
1656 #ifdef HAVE_QUOTA_SUPPORT
1657         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1658                 struct obd_export *exp = md_quota(env)->mq_exp;
1659                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1660
1661                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1662                 if (!rc) {
1663                         quota_opc = FSFILT_OP_SETATTR;
1664                         mdd_quota_wrapper(la_copy, qnids);
1665                         mdd_quota_wrapper(la_tmp, qoids);
1666                         /* get file quota for new owner */
1667                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1668                                         qnids, inode_pending, 1, NULL, 0,
1669                                         NULL, 0);
1670                         block_count = (la_tmp->la_blocks + 7) >> 3;
1671                         if (block_count) {
1672                                 void *data = NULL;
1673                                 mdd_data_get(env, mdd_obj, &data);
1674                                 /* get block quota for new owner */
1675                                 lquota_chkquota(mds_quota_interface_ref, obd,
1676                                                 exp, qnids, block_pending,
1677                                                 block_count, NULL,
1678                                                 LQUOTA_FLAGS_BLK, data, 1);
1679                         }
1680                 }
1681         }
1682 #endif
1683
1684         if (la_copy->la_valid & LA_FLAGS) {
1685                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1686                                                   handle, 1);
1687                 if (rc == 0)
1688                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1689         } else if (la_copy->la_valid) {            /* setattr */
1690                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1691                                                   handle, 1);
1692                 /* journal chown/chgrp in llog, just like unlink */
1693                 if (rc == 0 && lmm_size){
1694                         cookie_size = mdd_lov_cookiesize(env, mdd);
1695                         logcookies = mdd_max_cookie_get(env, mdd);
1696                         if (logcookies == NULL)
1697                                 GOTO(cleanup, rc = -ENOMEM);
1698
1699                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1700                                             logcookies, cookie_size) <= 0)
1701                                 logcookies = NULL;
1702                 }
1703         }
1704
1705         if (rc == 0 && ma->ma_valid & MA_LOV) {
1706                 cfs_umode_t mode;
1707
1708                 mode = mdd_object_type(mdd_obj);
1709                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1710                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1711                         if (rc)
1712                                 GOTO(cleanup, rc);
1713
1714                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1715                                             ma->ma_lmm_size, handle, 1);
1716                 }
1717
1718         }
1719         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1720                 cfs_umode_t mode;
1721
1722                 mode = mdd_object_type(mdd_obj);
1723                 if (S_ISREG(mode))
1724                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1725
1726         }
1727 cleanup:
1728         if (rc == 0)
1729                 rc = mdd_attr_set_changelog(env, obj, handle,
1730                                             ma->ma_attr.la_valid);
1731 stop:
1732         mdd_trans_stop(env, mdd, rc, handle);
1733         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1734                 /*set obd attr, if needed*/
1735                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1736                                            logcookies);
1737         }
1738 #ifdef HAVE_QUOTA_SUPPORT
1739         if (quota_opc) {
1740                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1741                                       inode_pending, 0);
1742                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1743                                       block_pending, 1);
1744                 /* Trigger dqrel/dqacq for original owner and new owner.
1745                  * If failed, the next call for lquota_chkquota will
1746                  * process it. */
1747                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1748                               quota_opc);
1749         }
1750 #endif
1751         RETURN(rc);
1752 }
1753
1754 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1755                       const struct lu_buf *buf, const char *name, int fl,
1756                       struct thandle *handle)
1757 {
1758         int  rc;
1759         ENTRY;
1760
1761         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1762         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1763         mdd_write_unlock(env, obj);
1764
1765         RETURN(rc);
1766 }
1767
1768 static int mdd_xattr_sanity_check(const struct lu_env *env,
1769                                   struct mdd_object *obj)
1770 {
1771         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1772         struct md_ucred *uc     = md_ucred(env);
1773         int rc;
1774         ENTRY;
1775
1776         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1777                 RETURN(-EPERM);
1778
1779         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1780         if (rc)
1781                 RETURN(rc);
1782
1783         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1784             !mdd_capable(uc, CFS_CAP_FOWNER))
1785                 RETURN(-EPERM);
1786
1787         RETURN(rc);
1788 }
1789
1790 static int mdd_declare_xattr_set(const struct lu_env *env,
1791                                  struct mdd_device *mdd,
1792                                  struct mdd_object *obj,
1793                                  const struct lu_buf *buf,
1794                                  const char *name,
1795                                  struct thandle *handle)
1796
1797 {
1798         int rc;
1799
1800         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1801         if (rc)
1802                 return rc;
1803
1804         /* Only record user xattr changes */
1805         if ((strncmp("user.", name, 5) == 0))
1806                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1807
1808         return rc;
1809 }
1810
1811 /**
1812  * The caller should guarantee to update the object ctime
1813  * after xattr_set if needed.
1814  */
1815 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1816                          const struct lu_buf *buf, const char *name,
1817                          int fl)
1818 {
1819         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1820         struct mdd_device *mdd = mdo2mdd(obj);
1821         struct thandle *handle;
1822         int  rc;
1823         ENTRY;
1824
1825         rc = mdd_xattr_sanity_check(env, mdd_obj);
1826         if (rc)
1827                 RETURN(rc);
1828
1829         handle = mdd_trans_create(env, mdd);
1830         if (IS_ERR(handle))
1831                 RETURN(PTR_ERR(handle));
1832
1833         /* security-replated changes may require sync */
1834         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1835             mdd->mdd_sync_permission == 1)
1836                 handle->th_sync = 1;
1837
1838         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1839         if (rc)
1840                 GOTO(stop, rc);
1841
1842         rc = mdd_trans_start(env, mdd, handle);
1843         if (rc)
1844                 GOTO(stop, rc);
1845
1846         /* security-replated changes may require sync */
1847         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1848                 handle->th_sync |= mdd->mdd_sync_permission;
1849
1850         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1851
1852         /* Only record system & user xattr changes */
1853         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1854                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1855                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1856                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1857                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1858                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1859                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1860                                               handle);
1861
1862 stop:
1863         mdd_trans_stop(env, mdd, rc, handle);
1864
1865         RETURN(rc);
1866 }
1867
1868 static int mdd_declare_xattr_del(const struct lu_env *env,
1869                                  struct mdd_device *mdd,
1870                                  struct mdd_object *obj,
1871                                  const char *name,
1872                                  struct thandle *handle)
1873 {
1874         int rc;
1875
1876         rc = mdo_declare_xattr_del(env, obj, name, handle);
1877         if (rc)
1878                 return rc;
1879
1880         /* Only record user xattr changes */
1881         if ((strncmp("user.", name, 5) == 0))
1882                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1883
1884         return rc;
1885 }
1886
1887 /**
1888  * The caller should guarantee to update the object ctime
1889  * after xattr_set if needed.
1890  */
1891 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1892                   const char *name)
1893 {
1894         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1895         struct mdd_device *mdd = mdo2mdd(obj);
1896         struct thandle *handle;
1897         int  rc;
1898         ENTRY;
1899
1900         rc = mdd_xattr_sanity_check(env, mdd_obj);
1901         if (rc)
1902                 RETURN(rc);
1903
1904         handle = mdd_trans_create(env, mdd);
1905         if (IS_ERR(handle))
1906                 RETURN(PTR_ERR(handle));
1907
1908         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1909         if (rc)
1910                 GOTO(stop, rc);
1911
1912         rc = mdd_trans_start(env, mdd, handle);
1913         if (rc)
1914                 GOTO(stop, rc);
1915
1916         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1917         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1918                            mdd_object_capa(env, mdd_obj));
1919         mdd_write_unlock(env, mdd_obj);
1920
1921         /* Only record system & user xattr changes */
1922         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1923                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1924                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1925                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1926                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1927                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1928                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1929                                               handle);
1930
1931 stop:
1932         mdd_trans_stop(env, mdd, rc, handle);
1933
1934         RETURN(rc);
1935 }
1936
1937 /* partial unlink */
1938 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1939                        struct md_attr *ma)
1940 {
1941         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1942         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1943         struct mdd_device *mdd = mdo2mdd(obj);
1944         struct thandle *handle;
1945 #ifdef HAVE_QUOTA_SUPPORT
1946         struct obd_device *obd = mdd->mdd_obd_dev;
1947         struct mds_obd *mds = &obd->u.mds;
1948         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1949         int quota_opc = 0;
1950 #endif
1951         int rc;
1952         ENTRY;
1953
1954         /* XXX: this code won't be used ever:
1955          * DNE uses slightly different approach */
1956         LBUG();
1957
1958         /*
1959          * Check -ENOENT early here because we need to get object type
1960          * to calculate credits before transaction start
1961          */
1962         if (mdd_object_exists(mdd_obj) == 0) {
1963                 CERROR("%s: object "DFID" not found: rc = -2\n",
1964                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1965                 RETURN(-ENOENT);
1966         }
1967
1968         LASSERT(mdd_object_exists(mdd_obj) > 0);
1969
1970         handle = mdd_trans_create(env, mdd);
1971         if (IS_ERR(handle))
1972                 RETURN(-ENOMEM);
1973
1974         rc = mdd_trans_start(env, mdd, handle);
1975
1976         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1977
1978         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1979         if (rc)
1980                 GOTO(cleanup, rc);
1981
1982         mdo_ref_del(env, mdd_obj, handle);
1983
1984         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1985                 /* unlink dot */
1986                 mdo_ref_del(env, mdd_obj, handle);
1987         }
1988
1989         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1990         la_copy->la_ctime = ma->ma_attr.la_ctime;
1991
1992         la_copy->la_valid = LA_CTIME;
1993         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1994         if (rc)
1995                 GOTO(cleanup, rc);
1996
1997         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1998 #ifdef HAVE_QUOTA_SUPPORT
1999         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2000             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2001                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2002                 mdd_quota_wrapper(&ma->ma_attr, qids);
2003         }
2004 #endif
2005
2006
2007         EXIT;
2008 cleanup:
2009         mdd_write_unlock(env, mdd_obj);
2010         mdd_trans_stop(env, mdd, rc, handle);
2011 #ifdef HAVE_QUOTA_SUPPORT
2012         if (quota_opc)
2013                 /* Trigger dqrel on the owner of child. If failed,
2014                  * the next call for lquota_chkquota will process it */
2015                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2016                               quota_opc);
2017 #endif
2018         return rc;
2019 }
2020
2021 /* partial operation */
2022 static int mdd_oc_sanity_check(const struct lu_env *env,
2023                                struct mdd_object *obj,
2024                                struct md_attr *ma)
2025 {
2026         int rc;
2027         ENTRY;
2028
2029         switch (ma->ma_attr.la_mode & S_IFMT) {
2030         case S_IFREG:
2031         case S_IFDIR:
2032         case S_IFLNK:
2033         case S_IFCHR:
2034         case S_IFBLK:
2035         case S_IFIFO:
2036         case S_IFSOCK:
2037                 rc = 0;
2038                 break;
2039         default:
2040                 rc = -EINVAL;
2041                 break;
2042         }
2043         RETURN(rc);
2044 }
2045
2046 static int mdd_object_create(const struct lu_env *env,
2047                              struct md_object *obj,
2048                              const struct md_op_spec *spec,
2049                              struct md_attr *ma)
2050 {
2051
2052         struct mdd_device *mdd = mdo2mdd(obj);
2053         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2054         const struct lu_fid *pfid = spec->u.sp_pfid;
2055         struct thandle *handle;
2056 #ifdef HAVE_QUOTA_SUPPORT
2057         struct obd_device *obd = mdd->mdd_obd_dev;
2058         struct obd_export *exp = md_quota(env)->mq_exp;
2059         struct mds_obd *mds = &obd->u.mds;
2060         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2061         int quota_opc = 0, block_count = 0;
2062         int inode_pending[MAXQUOTAS] = { 0, 0 };
2063         int block_pending[MAXQUOTAS] = { 0, 0 };
2064 #endif
2065         int rc = 0;
2066         ENTRY;
2067
2068         /* XXX: this code won't be used ever:
2069          * DNE uses slightly different approach */
2070         LBUG();
2071
2072 #ifdef HAVE_QUOTA_SUPPORT
2073         if (mds->mds_quota) {
2074                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2075                 mdd_quota_wrapper(&ma->ma_attr, qids);
2076                 /* get file quota for child */
2077                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2078                                 qids, inode_pending, 1, NULL, 0,
2079                                 NULL, 0);
2080                 switch (ma->ma_attr.la_mode & S_IFMT) {
2081                 case S_IFLNK:
2082                 case S_IFDIR:
2083                         block_count = 2;
2084                         break;
2085                 case S_IFREG:
2086                         block_count = 1;
2087                         break;
2088                 }
2089                 /* get block quota for child */
2090                 if (block_count)
2091                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2092                                         qids, block_pending, block_count,
2093                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2094         }
2095 #endif
2096
2097         handle = mdd_trans_create(env, mdd);
2098         if (IS_ERR(handle))
2099                 GOTO(out_pending, rc = PTR_ERR(handle));
2100
2101         rc = mdd_trans_start(env, mdd, handle);
2102
2103         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2104         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2105         if (rc)
2106                 GOTO(unlock, rc);
2107
2108         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2109         if (rc)
2110                 GOTO(unlock, rc);
2111
2112         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2113                 /* If creating the slave object, set slave EA here. */
2114                 int lmv_size = spec->u.sp_ea.eadatalen;
2115                 struct lmv_stripe_md *lmv;
2116
2117                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2118                 LASSERT(lmv != NULL && lmv_size > 0);
2119
2120                 rc = __mdd_xattr_set(env, mdd_obj,
2121                                      mdd_buf_get_const(env, lmv, lmv_size),
2122                                      XATTR_NAME_LMV, 0, handle);
2123                 if (rc)
2124                         GOTO(unlock, rc);
2125
2126                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2127                                            handle, 0);
2128         } else {
2129 #ifdef CONFIG_FS_POSIX_ACL
2130                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2131                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2132
2133                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2134                         buf->lb_len = spec->u.sp_ea.eadatalen;
2135                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2136                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2137                                                     &ma->ma_attr.la_mode,
2138                                                     handle);
2139                                 if (rc)
2140                                         GOTO(unlock, rc);
2141                                 else
2142                                         ma->ma_attr.la_valid |= LA_MODE;
2143                         }
2144
2145                         pfid = spec->u.sp_ea.fid;
2146                 }
2147 #endif
2148                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2149                                            spec);
2150         }
2151         EXIT;
2152 unlock:
2153         if (rc == 0)
2154                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2155         mdd_write_unlock(env, mdd_obj);
2156
2157         mdd_trans_stop(env, mdd, rc, handle);
2158 out_pending:
2159 #ifdef HAVE_QUOTA_SUPPORT
2160         if (quota_opc) {
2161                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2162                                       inode_pending, 0);
2163                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2164                                       block_pending, 1);
2165                 /* Trigger dqacq on the owner of child. If failed,
2166                  * the next call for lquota_chkquota will process it. */
2167                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2168                               quota_opc);
2169         }
2170 #endif
2171         return rc;
2172 }
2173
2174 /* partial link */
2175 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2176                        const struct md_attr *ma)
2177 {
2178         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2179         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2180         struct mdd_device *mdd = mdo2mdd(obj);
2181         struct thandle *handle;
2182         int rc;
2183         ENTRY;
2184
2185         /* XXX: this code won't be used ever:
2186          * DNE uses slightly different approach */
2187         LBUG();
2188
2189         handle = mdd_trans_create(env, mdd);
2190         if (IS_ERR(handle))
2191                 RETURN(-ENOMEM);
2192
2193         rc = mdd_trans_start(env, mdd, handle);
2194
2195         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2196         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2197         if (rc == 0)
2198                 mdo_ref_add(env, mdd_obj, handle);
2199         mdd_write_unlock(env, mdd_obj);
2200         if (rc == 0) {
2201                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2202                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2203
2204                 la_copy->la_valid = LA_CTIME;
2205                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2206                                                         handle, 0);
2207         }
2208         mdd_trans_stop(env, mdd, 0, handle);
2209
2210         RETURN(rc);
2211 }
2212
2213 /*
2214  * do NOT or the MAY_*'s, you'll get the weakest
2215  */
2216 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2217 {
2218         int res = 0;
2219
2220         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2221          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2222          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2223          * owner can write to a file even if it is marked readonly to hide
2224          * its brokenness. (bug 5781) */
2225         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2226                 struct md_ucred *uc = md_ucred(env);
2227
2228                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2229                     (la->la_uid == uc->mu_fsuid))
2230                         return 0;
2231         }
2232
2233         if (flags & FMODE_READ)
2234                 res |= MAY_READ;
2235         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2236                 res |= MAY_WRITE;
2237         if (flags & MDS_FMODE_EXEC)
2238                 res = MAY_EXEC;
2239         return res;
2240 }
2241
2242 static int mdd_open_sanity_check(const struct lu_env *env,
2243                                  struct mdd_object *obj, int flag)
2244 {
2245         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2246         int mode, rc;
2247         ENTRY;
2248
2249         /* EEXIST check */
2250         if (mdd_is_dead_obj(obj))
2251                 RETURN(-ENOENT);
2252
2253         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2254         if (rc)
2255                RETURN(rc);
2256
2257         if (S_ISLNK(tmp_la->la_mode))
2258                 RETURN(-ELOOP);
2259
2260         mode = accmode(env, tmp_la, flag);
2261
2262         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2263                 RETURN(-EISDIR);
2264
2265         if (!(flag & MDS_OPEN_CREATED)) {
2266                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2267                 if (rc)
2268                         RETURN(rc);
2269         }
2270
2271         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2272             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2273                 flag &= ~MDS_OPEN_TRUNC;
2274
2275         /* For writing append-only file must open it with append mode. */
2276         if (mdd_is_append(obj)) {
2277                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2278                         RETURN(-EPERM);
2279                 if (flag & MDS_OPEN_TRUNC)
2280                         RETURN(-EPERM);
2281         }
2282
2283 #if 0
2284         /*
2285          * Now, flag -- O_NOATIME does not be packed by client.
2286          */
2287         if (flag & O_NOATIME) {
2288                 struct md_ucred *uc = md_ucred(env);
2289
2290                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2291                     (uc->mu_valid == UCRED_NEW)) &&
2292                     (uc->mu_fsuid != tmp_la->la_uid) &&
2293                     !mdd_capable(uc, CFS_CAP_FOWNER))
2294                         RETURN(-EPERM);
2295         }
2296 #endif
2297
2298         RETURN(0);
2299 }
2300
2301 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2302                     int flags)
2303 {
2304         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2305         int rc = 0;
2306
2307         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2308
2309         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2310         if (rc == 0)
2311                 mdd_obj->mod_count++;
2312
2313         mdd_write_unlock(env, mdd_obj);
2314         return rc;
2315 }
2316
2317 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2318                             struct md_attr *ma, struct thandle *handle)
2319 {
2320         int rc;
2321
2322         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2323         if (rc)
2324                 return rc;
2325
2326         return mdo_declare_destroy(env, obj, handle);
2327 }
2328
2329 /* return md_attr back,
2330  * if it is last unlink then return lov ea + llog cookie*/
2331 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2332                     struct md_attr *ma, struct thandle *handle)
2333 {
2334         int rc = 0;
2335         ENTRY;
2336
2337         if (S_ISREG(mdd_object_type(obj))) {
2338                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2339                  * Caller must be ready for that. */
2340
2341                 rc = __mdd_lmm_get(env, obj, ma);
2342                 if ((ma->ma_valid & MA_LOV))
2343                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2344                                             obj, ma);
2345         }
2346
2347         if (rc == 0)
2348                 rc = mdo_destroy(env, obj, handle);
2349
2350         RETURN(rc);
2351 }
2352
2353 static int mdd_declare_close(const struct lu_env *env,
2354                              struct mdd_object *obj,
2355                              struct md_attr *ma,
2356                              struct thandle *handle)
2357 {
2358         int rc;
2359
2360         rc = orph_declare_index_delete(env, obj, handle);
2361         if (rc)
2362                 return rc;
2363
2364         return mdd_declare_object_kill(env, obj, ma, handle);
2365 }
2366
2367 /*
2368  * No permission check is needed.
2369  */
2370 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2371                      struct md_attr *ma, int mode)
2372 {
2373         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2374         struct mdd_device *mdd = mdo2mdd(obj);
2375         struct thandle    *handle = NULL;
2376         int rc;
2377         int is_orphan = 0, reset = 1;
2378
2379 #ifdef HAVE_QUOTA_SUPPORT
2380         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2381         struct mds_obd *mds = &obd->u.mds;
2382         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2383         int quota_opc = 0;
2384 #endif
2385         ENTRY;
2386
2387         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2388                 mdd_obj->mod_count--;
2389
2390                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2391                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2392                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2393                 RETURN(0);
2394         }
2395
2396         /* check without any lock */
2397         if (mdd_obj->mod_count == 1 &&
2398             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2399  again:
2400                 handle = mdd_trans_create(env, mdo2mdd(obj));
2401                 if (IS_ERR(handle))
2402                         RETURN(PTR_ERR(handle));
2403
2404                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2405                 if (rc)
2406                         GOTO(stop, rc);
2407
2408                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2409                 if (rc)
2410                         GOTO(stop, rc);
2411
2412                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2413                 if (rc)
2414                         GOTO(stop, rc);
2415         }
2416
2417         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2418         if (handle == NULL && mdd_obj->mod_count == 1 &&
2419             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2420                 mdd_write_unlock(env, mdd_obj);
2421                 goto again;
2422         }
2423
2424         /* release open count */
2425         mdd_obj->mod_count --;
2426
2427         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2428                 /* remove link to object from orphan index */
2429                 LASSERT(handle != NULL);
2430                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2431                 if (rc == 0) {
2432                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2433                                "list, OSS objects to be destroyed.\n",
2434                                PFID(mdd_object_fid(mdd_obj)));
2435                         is_orphan = 1;
2436                 } else {
2437                         CERROR("Object "DFID" can not be deleted from orphan "
2438                                 "list, maybe cause OST objects can not be "
2439                                 "destroyed (err: %d).\n",
2440                                 PFID(mdd_object_fid(mdd_obj)), rc);
2441                         /* If object was not deleted from orphan list, do not
2442                          * destroy OSS objects, which will be done when next
2443                          * recovery. */
2444                         GOTO(out, rc);
2445                 }
2446         }
2447
2448         rc = mdd_iattr_get(env, mdd_obj, ma);
2449         /* Object maybe not in orphan list originally, it is rare case for
2450          * mdd_finish_unlink() failure. */
2451         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2452 #ifdef HAVE_QUOTA_SUPPORT
2453                 if (mds->mds_quota) {
2454                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2455                         mdd_quota_wrapper(&ma->ma_attr, qids);
2456                 }
2457 #endif
2458                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2459                 if (ma->ma_valid & MA_FLAGS &&
2460                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2461                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2462                 } else {
2463                         if (handle == NULL) {
2464                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2465                                 if (IS_ERR(handle))
2466                                         GOTO(out, rc = PTR_ERR(handle));
2467
2468                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2469                                                              handle);
2470                                 if (rc)
2471                                         GOTO(out, rc);
2472
2473                                 rc = mdd_declare_changelog_store(env, mdd,
2474                                                                  NULL, handle);
2475                                 if (rc)
2476                                         GOTO(stop, rc);
2477
2478                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2479                                 if (rc)
2480                                         GOTO(out, rc);
2481                         }
2482
2483                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2484                         if (rc == 0)
2485                                 reset = 0;
2486                 }
2487
2488                 if (rc != 0)
2489                         CERROR("Error when prepare to delete Object "DFID" , "
2490                                "which will cause OST objects can not be "
2491                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2492         }
2493         EXIT;
2494
2495 out:
2496         if (reset)
2497                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2498
2499         mdd_write_unlock(env, mdd_obj);
2500
2501         if (rc == 0 &&
2502             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2503             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2504                 if (handle == NULL) {
2505                         handle = mdd_trans_create(env, mdo2mdd(obj));
2506                         if (IS_ERR(handle))
2507                                 GOTO(stop, rc = IS_ERR(handle));
2508
2509                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2510                                                          handle);
2511                         if (rc)
2512                                 GOTO(stop, rc);
2513
2514                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2515                         if (rc)
2516                                 GOTO(stop, rc);
2517                 }
2518
2519                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2520                                          mdd_obj, handle);
2521         }
2522
2523 stop:
2524         if (handle != NULL)
2525                 mdd_trans_stop(env, mdd, rc, handle);
2526 #ifdef HAVE_QUOTA_SUPPORT
2527         if (quota_opc)
2528                 /* Trigger dqrel on the owner of child. If failed,
2529                  * the next call for lquota_chkquota will process it */
2530                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2531                               quota_opc);
2532 #endif
2533         return rc;
2534 }
2535
2536 /*
2537  * Permission check is done when open,
2538  * no need check again.
2539  */
2540 static int mdd_readpage_sanity_check(const struct lu_env *env,
2541                                      struct mdd_object *obj)
2542 {
2543         struct dt_object *next = mdd_object_child(obj);
2544         int rc;
2545         ENTRY;
2546
2547         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2548                 rc = 0;
2549         else
2550                 rc = -ENOTDIR;
2551
2552         RETURN(rc);
2553 }
2554
2555 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2556                               struct lu_dirpage *dp, int nob,
2557                               const struct dt_it_ops *iops, struct dt_it *it,
2558                               __u32 attr)
2559 {
2560         void                   *area = dp;
2561         int                     result;
2562         __u64                   hash = 0;
2563         struct lu_dirent       *ent;
2564         struct lu_dirent       *last = NULL;
2565         int                     first = 1;
2566
2567         memset(area, 0, sizeof (*dp));
2568         area += sizeof (*dp);
2569         nob  -= sizeof (*dp);
2570
2571         ent  = area;
2572         do {
2573                 int    len;
2574                 int    recsize;
2575
2576                 len = iops->key_size(env, it);
2577
2578                 /* IAM iterator can return record with zero len. */
2579                 if (len == 0)
2580                         goto next;
2581
2582                 hash = iops->store(env, it);
2583                 if (unlikely(first)) {
2584                         first = 0;
2585                         dp->ldp_hash_start = cpu_to_le64(hash);
2586                 }
2587
2588                 /* calculate max space required for lu_dirent */
2589                 recsize = lu_dirent_calc_size(len, attr);
2590
2591                 if (nob >= recsize) {
2592                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2593                         if (result == -ESTALE)
2594                                 goto next;
2595                         if (result != 0)
2596                                 goto out;
2597
2598                         /* osd might not able to pack all attributes,
2599                          * so recheck rec length */
2600                         recsize = le16_to_cpu(ent->lde_reclen);
2601                 } else {
2602                         result = (last != NULL) ? 0 :-EINVAL;
2603                         goto out;
2604                 }
2605                 last = ent;
2606                 ent = (void *)ent + recsize;
2607                 nob -= recsize;
2608
2609 next:
2610                 result = iops->next(env, it);
2611                 if (result == -ESTALE)
2612                         goto next;
2613         } while (result == 0);
2614
2615 out:
2616         dp->ldp_hash_end = cpu_to_le64(hash);
2617         if (last != NULL) {
2618                 if (last->lde_hash == dp->ldp_hash_end)
2619                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2620                 last->lde_reclen = 0; /* end mark */
2621         }
2622         return result;
2623 }
2624
2625 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2626                           const struct lu_rdpg *rdpg)
2627 {
2628         struct dt_it      *it;
2629         struct dt_object  *next = mdd_object_child(obj);
2630         const struct dt_it_ops  *iops;
2631         struct page       *pg;
2632         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2633         int i;
2634         int nlupgs = 0;
2635         int rc;
2636         int nob;
2637
2638         LASSERT(rdpg->rp_pages != NULL);
2639         LASSERT(next->do_index_ops != NULL);
2640
2641         if (rdpg->rp_count <= 0)
2642                 return -EFAULT;
2643
2644         /*
2645          * iterate through directory and fill pages from @rdpg
2646          */
2647         iops = &next->do_index_ops->dio_it;
2648         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2649         if (IS_ERR(it))
2650                 return PTR_ERR(it);
2651
2652         rc = iops->load(env, it, rdpg->rp_hash);
2653
2654         if (rc == 0) {
2655                 /*
2656                  * Iterator didn't find record with exactly the key requested.
2657                  *
2658                  * It is currently either
2659                  *
2660                  *     - positioned above record with key less than
2661                  *     requested---skip it.
2662                  *
2663                  *     - or not positioned at all (is in IAM_IT_SKEWED
2664                  *     state)---position it on the next item.
2665                  */
2666                 rc = iops->next(env, it);
2667         } else if (rc > 0)
2668                 rc = 0;
2669
2670         /*
2671          * At this point and across for-loop:
2672          *
2673          *  rc == 0 -> ok, proceed.
2674          *  rc >  0 -> end of directory.
2675          *  rc <  0 -> error.
2676          */
2677         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2678              i++, nob -= CFS_PAGE_SIZE) {
2679                 struct lu_dirpage *dp;
2680
2681                 LASSERT(i < rdpg->rp_npages);
2682                 pg = rdpg->rp_pages[i];
2683                 dp = cfs_kmap(pg);
2684 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2685 repeat:
2686 #endif
2687                 rc = mdd_dir_page_build(env, mdd, dp,
2688                                         min_t(int, nob, LU_PAGE_SIZE),
2689                                         iops, it, rdpg->rp_attrs);
2690                 if (rc > 0) {
2691                         /*
2692                          * end of directory.
2693                          */
2694                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2695                         nlupgs++;
2696                 } else if (rc < 0) {
2697                         CWARN("build page failed: %d!\n", rc);
2698                 } else {
2699                         nlupgs++;
2700 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2701                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2702                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2703                                 goto repeat;
2704 #endif
2705                 }
2706                 cfs_kunmap(pg);
2707         }
2708         if (rc >= 0) {
2709                 struct lu_dirpage *dp;
2710
2711                 dp = cfs_kmap(rdpg->rp_pages[0]);
2712                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2713                 if (nlupgs == 0) {
2714                         /*
2715                          * No pages were processed, mark this for first page
2716                          * and send back.
2717                          */
2718                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2719                         nlupgs = 1;
2720                 }
2721                 cfs_kunmap(rdpg->rp_pages[0]);
2722
2723                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2724         }
2725         iops->put(env, it);
2726         iops->fini(env, it);
2727
2728         return rc;
2729 }
2730
2731 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2732                  const struct lu_rdpg *rdpg)
2733 {
2734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2735         int rc;
2736         ENTRY;
2737
2738         if (mdd_object_exists(mdd_obj) == 0) {
2739                 CERROR("%s: object "DFID" not found: rc = -2\n",
2740                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2741                 return -ENOENT;
2742         }
2743
2744         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2745         rc = mdd_readpage_sanity_check(env, mdd_obj);
2746         if (rc)
2747                 GOTO(out_unlock, rc);
2748
2749         if (mdd_is_dead_obj(mdd_obj)) {
2750                 struct page *pg;
2751                 struct lu_dirpage *dp;
2752
2753                 /*
2754                  * According to POSIX, please do not return any entry to client:
2755                  * even dot and dotdot should not be returned.
2756                  */
2757                 CWARN("readdir from dead object: "DFID"\n",
2758                         PFID(mdd_object_fid(mdd_obj)));
2759
2760                 if (rdpg->rp_count <= 0)
2761                         GOTO(out_unlock, rc = -EFAULT);
2762                 LASSERT(rdpg->rp_pages != NULL);
2763
2764                 pg = rdpg->rp_pages[0];
2765                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2766                 memset(dp, 0 , sizeof(struct lu_dirpage));
2767                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2768                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2769                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2770                 cfs_kunmap(pg);
2771                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2772         }
2773
2774         rc = __mdd_readpage(env, mdd_obj, rdpg);
2775
2776         EXIT;
2777 out_unlock:
2778         mdd_read_unlock(env, mdd_obj);
2779         return rc;
2780 }
2781
2782 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2783 {
2784         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2785         struct dt_object *next;
2786
2787         if (mdd_object_exists(mdd_obj) == 0) {
2788                 CERROR("%s: object "DFID" not found: rc = -2\n",
2789                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2790                 return -ENOENT;
2791         }
2792         next = mdd_object_child(mdd_obj);
2793         return next->do_ops->do_object_sync(env, next);
2794 }
2795
2796 const struct md_object_operations mdd_obj_ops = {
2797         .moo_permission    = mdd_permission,
2798         .moo_attr_get      = mdd_attr_get,
2799         .moo_attr_set      = mdd_attr_set,
2800         .moo_xattr_get     = mdd_xattr_get,
2801         .moo_xattr_set     = mdd_xattr_set,
2802         .moo_xattr_list    = mdd_xattr_list,
2803         .moo_xattr_del     = mdd_xattr_del,
2804         .moo_object_create = mdd_object_create,
2805         .moo_ref_add       = mdd_ref_add,
2806         .moo_ref_del       = mdd_ref_del,
2807         .moo_open          = mdd_open,
2808         .moo_close         = mdd_close,
2809         .moo_readpage      = mdd_readpage,
2810         .moo_readlink      = mdd_readlink,
2811         .moo_changelog     = mdd_changelog,
2812         .moo_capa_get      = mdd_capa_get,
2813         .moo_object_sync   = mdd_object_sync,
2814         .moo_path          = mdd_path,
2815         .moo_file_lock     = mdd_file_lock,
2816         .moo_file_unlock   = mdd_file_unlock,
2817 };