Whamcloud - gitweb
583a29b2ae95b38b3e9262030b569dc7b0959b3e
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58 #include <obd_lov.h>
59
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
63
64 #include "mdd_internal.h"
65
66 static const struct lu_object_operations mdd_lu_obj_ops;
67
68 static int mdd_xattr_get(const struct lu_env *env,
69                          struct md_object *obj, struct lu_buf *buf,
70                          const char *name);
71
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73                  void **data)
74 {
75         if (mdd_object_exists(obj) == 0) {
76                 CERROR("%s: object "DFID" not found: rc = -2\n",
77                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
78                 return -ENOENT;
79         }
80         mdo_data_get(env, obj, data);
81         return 0;
82 }
83
84 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
85                struct lu_attr *la, struct lustre_capa *capa)
86 {
87         if (mdd_object_exists(obj) == 0) {
88                 CERROR("%s: object "DFID" not found: rc = -2\n",
89                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
90                 return -ENOENT;
91         }
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
130         buf->lb_buf = NULL;
131         buf->lb_len = 0;
132 }
133
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135                                        const void *area, ssize_t len)
136 {
137         struct lu_buf *buf;
138
139         buf = &mdd_env_info(env)->mti_buf;
140         buf->lb_buf = (void *)area;
141         buf->lb_len = len;
142         return buf;
143 }
144
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 {
147         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148
149         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
151                 buf->lb_buf = NULL;
152         }
153         if (buf->lb_buf == NULL) {
154                 buf->lb_len = len;
155                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156                 if (buf->lb_buf == NULL)
157                         buf->lb_len = 0;
158         }
159         return buf;
160 }
161
162 /** Increase the size of the \a mti_big_buf.
163  * preserves old data in buffer
164  * old buffer remains unchanged on error
165  * \retval 0 or -ENOMEM
166  */
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 {
169         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
170         struct lu_buf buf;
171
172         LASSERT(len >= oldbuf->lb_len);
173         OBD_ALLOC_LARGE(buf.lb_buf, len);
174
175         if (buf.lb_buf == NULL)
176                 return -ENOMEM;
177
178         buf.lb_len = len;
179         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180
181         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE_LARGE(mti->mti_max_cookie,
198                                        mti->mti_max_cookie_size);
199                 mti->mti_max_cookie = NULL;
200                 mti->mti_max_cookie_size = 0;
201         }
202         if (unlikely(mti->mti_max_cookie == NULL)) {
203                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204                 if (likely(mti->mti_max_cookie != NULL))
205                         mti->mti_max_cookie_size = max_cookie_size;
206         }
207         if (likely(mti->mti_max_cookie != NULL))
208                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209         return mti->mti_max_cookie;
210 }
211
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213                                    struct mdd_device *mdd)
214 {
215         struct mdd_thread_info *mti = mdd_env_info(env);
216         int                     max_lmm_size;
217
218         max_lmm_size = mdd_lov_mdsize(env, mdd);
219         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220                 if (mti->mti_max_lmm)
221                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222                 mti->mti_max_lmm = NULL;
223                 mti->mti_max_lmm_size = 0;
224         }
225         if (unlikely(mti->mti_max_lmm == NULL)) {
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = max_lmm_size;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234                                    const struct lu_object_header *hdr,
235                                    struct lu_device *d)
236 {
237         struct mdd_object *mdd_obj;
238
239         OBD_ALLOC_PTR(mdd_obj);
240         if (mdd_obj != NULL) {
241                 struct lu_object *o;
242
243                 o = mdd2lu_obj(mdd_obj);
244                 lu_object_init(o, NULL, d);
245                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247                 mdd_obj->mod_count = 0;
248                 o->lo_ops = &mdd_lu_obj_ops;
249                 return o;
250         } else {
251                 return NULL;
252         }
253 }
254
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256                            const struct lu_object_conf *unused)
257 {
258         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259         struct mdd_object *mdd_obj = lu2mdd_obj(o);
260         struct lu_object  *below;
261         struct lu_device  *under;
262         ENTRY;
263
264         mdd_obj->mod_cltime = 0;
265         under = &d->mdd_child->dd_lu_dev;
266         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267         mdd_pdlock_init(mdd_obj);
268         if (below == NULL)
269                 RETURN(-ENOMEM);
270
271         lu_object_add(o, below);
272
273         RETURN(0);
274 }
275
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 {
278         if (lu_object_exists(o))
279                 return mdd_get_flags(env, lu2mdd_obj(o));
280         else
281                 return 0;
282 }
283
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 {
286         struct mdd_object *mdd = lu2mdd_obj(o);
287
288         lu_object_fini(o);
289         OBD_FREE_PTR(mdd);
290 }
291
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293                             lu_printer_t p, const struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297                     "valid=%x, cltime="LPU64", flags=%lx)",
298                     mdd, mdd->mod_count, mdd->mod_valid,
299                     mdd->mod_cltime, mdd->mod_flags);
300 }
301
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303         .loo_object_init    = mdd_object_init,
304         .loo_object_start   = mdd_object_start,
305         .loo_object_free    = mdd_object_free,
306         .loo_object_print   = mdd_object_print,
307 };
308
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310                                    struct mdd_device *d,
311                                    const struct lu_fid *f)
312 {
313         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
314 }
315
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317                         const char *path, struct lu_fid *fid)
318 {
319         struct lu_buf *buf;
320         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321         struct mdd_object *obj;
322         struct lu_name *lname = &mdd_env_info(env)->mti_name;
323         char *name;
324         int rc = 0;
325         ENTRY;
326
327         /* temp buffer for path element */
328         buf = mdd_buf_alloc(env, PATH_MAX);
329         if (buf->lb_buf == NULL)
330                 RETURN(-ENOMEM);
331
332         lname->ln_name = name = buf->lb_buf;
333         lname->ln_namelen = 0;
334         *f = mdd->mdd_root_fid;
335
336         while(1) {
337                 while (*path == '/')
338                         path++;
339                 if (*path == '\0')
340                         break;
341                 while (*path != '/' && *path != '\0') {
342                         *name = *path;
343                         path++;
344                         name++;
345                         lname->ln_namelen++;
346                 }
347
348                 *name = '\0';
349                 /* find obj corresponding to fid */
350                 obj = mdd_object_find(env, mdd, f);
351                 if (obj == NULL)
352                         GOTO(out, rc = -EREMOTE);
353                 if (IS_ERR(obj))
354                         GOTO(out, rc = PTR_ERR(obj));
355                 /* get child fid from parent and name */
356                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357                 mdd_object_put(env, obj);
358                 if (rc)
359                         break;
360
361                 name = buf->lb_buf;
362                 lname->ln_namelen = 0;
363         }
364
365         if (!rc)
366                 *fid = *f;
367 out:
368         RETURN(rc);
369 }
370
371 /** The maximum depth that fid2path() will search.
372  * This is limited only because we want to store the fids for
373  * historical path lookup purposes.
374  */
375 #define MAX_PATH_DEPTH 100
376
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379         __u64                pli_recno;        /**< history point */
380         __u64                pli_currec;       /**< current record */
381         struct lu_fid        pli_fid;
382         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383         struct mdd_object   *pli_mdd_obj;
384         char                *pli_path;         /**< full path */
385         int                  pli_pathlen;
386         int                  pli_linkno;       /**< which hardlink to follow */
387         int                  pli_fidcount;     /**< number of \a pli_fids */
388 };
389
390 static int mdd_path_current(const struct lu_env *env,
391                             struct path_lookup_info *pli)
392 {
393         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394         struct mdd_object *mdd_obj;
395         struct lu_buf     *buf = NULL;
396         struct link_ea_header *leh;
397         struct link_ea_entry  *lee;
398         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
400         char *ptr;
401         int reclen;
402         int rc;
403         ENTRY;
404
405         ptr = pli->pli_path + pli->pli_pathlen - 1;
406         *ptr = 0;
407         --ptr;
408         pli->pli_fidcount = 0;
409         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410
411         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412                 mdd_obj = mdd_object_find(env, mdd,
413                                           &pli->pli_fids[pli->pli_fidcount]);
414                 if (mdd_obj == NULL)
415                         GOTO(out, rc = -EREMOTE);
416                 if (IS_ERR(mdd_obj))
417                         GOTO(out, rc = PTR_ERR(mdd_obj));
418                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419                 if (rc <= 0) {
420                         mdd_object_put(env, mdd_obj);
421                         if (rc == -1)
422                                 rc = -EREMOTE;
423                         else if (rc == 0)
424                                 /* Do I need to error out here? */
425                                 rc = -ENOENT;
426                         GOTO(out, rc);
427                 }
428
429                 /* Get parent fid and object name */
430                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431                 buf = mdd_links_get(env, mdd_obj);
432                 mdd_read_unlock(env, mdd_obj);
433                 mdd_object_put(env, mdd_obj);
434                 if (IS_ERR(buf))
435                         GOTO(out, rc = PTR_ERR(buf));
436
437                 leh = buf->lb_buf;
438                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440
441                 /* If set, use link #linkno for path lookup, otherwise use
442                    link #0.  Only do this for the final path element. */
443                 if ((pli->pli_fidcount == 0) &&
444                     (pli->pli_linkno < leh->leh_reccount)) {
445                         int count;
446                         for (count = 0; count < pli->pli_linkno; count++) {
447                                 lee = (struct link_ea_entry *)
448                                      ((char *)lee + reclen);
449                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450                         }
451                         if (pli->pli_linkno < leh->leh_reccount - 1)
452                                 /* indicate to user there are more links */
453                                 pli->pli_linkno++;
454                 }
455
456                 /* Pack the name in the end of the buffer */
457                 ptr -= tmpname->ln_namelen;
458                 if (ptr - 1 <= pli->pli_path)
459                         GOTO(out, rc = -EOVERFLOW);
460                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461                 *(--ptr) = '/';
462
463                 /* Store the parent fid for historic lookup */
464                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465                         GOTO(out, rc = -EOVERFLOW);
466                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467         }
468
469         /* Verify that our path hasn't changed since we started the lookup.
470            Record the current index, and verify the path resolves to the
471            same fid. If it does, then the path is correct as of this index. */
472         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473         pli->pli_currec = mdd->mdd_cl.mc_index;
474         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476         if (rc) {
477                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478                 GOTO (out, rc = -EAGAIN);
479         }
480         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483                        PFID(&pli->pli_fid));
484                 GOTO(out, rc = -EAGAIN);
485         }
486         ptr++; /* skip leading / */
487         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
488
489         EXIT;
490 out:
491         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492                 /* if we vmalloced a large buffer drop it */
493                 mdd_buf_put(buf);
494
495         return rc;
496 }
497
498 static int mdd_path_historic(const struct lu_env *env,
499                              struct path_lookup_info *pli)
500 {
501         return 0;
502 }
503
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506                     char *path, int pathlen, __u64 *recno, int *linkno)
507 {
508         struct path_lookup_info *pli;
509         int tries = 3;
510         int rc = -EAGAIN;
511         ENTRY;
512
513         if (pathlen < 3)
514                 RETURN(-EOVERFLOW);
515
516         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
517                 path[0] = '\0';
518                 RETURN(0);
519         }
520
521         OBD_ALLOC_PTR(pli);
522         if (pli == NULL)
523                 RETURN(-ENOMEM);
524
525         pli->pli_mdd_obj = md2mdd_obj(obj);
526         pli->pli_recno = *recno;
527         pli->pli_path = path;
528         pli->pli_pathlen = pathlen;
529         pli->pli_linkno = *linkno;
530
531         /* Retry multiple times in case file is being moved */
532         while (tries-- && rc == -EAGAIN)
533                 rc = mdd_path_current(env, pli);
534
535         /* For historical path lookup, the current links may not have existed
536          * at "recno" time.  We must switch over to earlier links/parents
537          * by using the changelog records.  If the earlier parent doesn't
538          * exist, we must search back through the changelog to reconstruct
539          * its parents, then check if it exists, etc.
540          * We may ignore this problem for the initial implementation and
541          * state that an "original" hardlink must still exist for us to find
542          * historic path name. */
543         if (pli->pli_recno != -1) {
544                 rc = mdd_path_historic(env, pli);
545         } else {
546                 *recno = pli->pli_currec;
547                 /* Return next link index to caller */
548                 *linkno = pli->pli_linkno;
549         }
550
551         OBD_FREE_PTR(pli);
552
553         RETURN (rc);
554 }
555
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 {
558         struct lu_attr *la = &mdd_env_info(env)->mti_la;
559         int rc;
560
561         ENTRY;
562         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563         if (rc == 0) {
564                 mdd_flags_xlate(obj, la->la_flags);
565         }
566         RETURN(rc);
567 }
568
569 /* get only inode attributes */
570 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
571                   struct md_attr *ma)
572 {
573         int rc = 0;
574         ENTRY;
575
576         if (ma->ma_valid & MA_INODE)
577                 RETURN(0);
578
579         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
580                           mdd_object_capa(env, mdd_obj));
581         if (rc == 0)
582                 ma->ma_valid |= MA_INODE;
583         RETURN(rc);
584 }
585
586 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
587 {
588         struct lov_desc *ldesc;
589         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
590         struct lov_user_md *lum = (struct lov_user_md*)lmm;
591         ENTRY;
592
593         if (!lum)
594                 RETURN(0);
595
596         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
597         LASSERT(ldesc != NULL);
598
599         lum->lmm_magic = LOV_MAGIC_V1;
600         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
601         lum->lmm_pattern = ldesc->ld_pattern;
602         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
603         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
604         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
605
606         RETURN(sizeof(*lum));
607 }
608
609 static int is_rootdir(struct mdd_object *mdd_obj)
610 {
611         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
612         const struct lu_fid *fid = mdo2fid(mdd_obj);
613
614         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
615 }
616
617 /* get lov EA only */
618 static int __mdd_lmm_get(const struct lu_env *env,
619                          struct mdd_object *mdd_obj, struct md_attr *ma)
620 {
621         int rc;
622         ENTRY;
623
624         if (ma->ma_valid & MA_LOV)
625                 RETURN(0);
626
627         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
628                         XATTR_NAME_LOV);
629         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
630                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
631         if (rc > 0) {
632                 ma->ma_lmm_size = rc;
633                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
634                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
635                 rc = 0;
636         }
637         RETURN(rc);
638 }
639
640 /* get the first parent fid from link EA */
641 static int mdd_pfid_get(const struct lu_env *env,
642                         struct mdd_object *mdd_obj, struct md_attr *ma)
643 {
644         struct lu_buf *buf;
645         struct link_ea_header *leh;
646         struct link_ea_entry *lee;
647         struct lu_fid *pfid = &ma->ma_pfid;
648         ENTRY;
649
650         if (ma->ma_valid & MA_PFID)
651                 RETURN(0);
652
653         buf = mdd_links_get(env, mdd_obj);
654         if (IS_ERR(buf))
655                 RETURN(PTR_ERR(buf));
656
657         leh = buf->lb_buf;
658         lee = (struct link_ea_entry *)(leh + 1);
659         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
660         fid_be_to_cpu(pfid, pfid);
661         ma->ma_valid |= MA_PFID;
662         if (buf->lb_len > OBD_ALLOC_BIG)
663                 /* if we vmalloced a large buffer drop it */
664                 mdd_buf_put(buf);
665         RETURN(0);
666 }
667
668 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
669                        struct md_attr *ma)
670 {
671         int rc;
672         ENTRY;
673
674         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
675         rc = __mdd_lmm_get(env, mdd_obj, ma);
676         mdd_read_unlock(env, mdd_obj);
677         RETURN(rc);
678 }
679
680 /* get lmv EA only*/
681 static int __mdd_lmv_get(const struct lu_env *env,
682                          struct mdd_object *mdd_obj, struct md_attr *ma)
683 {
684         int rc;
685         ENTRY;
686
687         if (ma->ma_valid & MA_LMV)
688                 RETURN(0);
689
690         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
691                         XATTR_NAME_LMV);
692         if (rc > 0) {
693                 ma->ma_valid |= MA_LMV;
694                 rc = 0;
695         }
696         RETURN(rc);
697 }
698
699 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
700                          struct md_attr *ma)
701 {
702         struct mdd_thread_info *info = mdd_env_info(env);
703         struct lustre_mdt_attrs *lma =
704                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
705         int lma_size;
706         int rc;
707         ENTRY;
708
709         /* If all needed data are already valid, nothing to do */
710         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
711             (ma->ma_need & (MA_HSM | MA_SOM)))
712                 RETURN(0);
713
714         /* Read LMA from disk EA */
715         lma_size = sizeof(info->mti_xattr_buf);
716         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
717         if (rc <= 0)
718                 RETURN(rc);
719
720         /* Useless to check LMA incompatibility because this is already done in
721          * osd_ea_fid_get(), and this will fail long before this code is
722          * called.
723          * So, if we are here, LMA is compatible.
724          */
725
726         lustre_lma_swab(lma);
727
728         /* Swab and copy LMA */
729         if (ma->ma_need & MA_HSM) {
730                 if (lma->lma_compat & LMAC_HSM)
731                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
732                 else
733                         ma->ma_hsm.mh_flags = 0;
734                 ma->ma_valid |= MA_HSM;
735         }
736
737         /* Copy SOM */
738         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
739                 LASSERT(ma->ma_som != NULL);
740                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
741                 ma->ma_som->msd_size    = lma->lma_som_size;
742                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
743                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
744                 ma->ma_valid |= MA_SOM;
745         }
746
747         RETURN(0);
748 }
749
750 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
751                                  struct md_attr *ma)
752 {
753         int rc = 0;
754         ENTRY;
755
756         if (ma->ma_need & MA_INODE)
757                 rc = mdd_iattr_get(env, mdd_obj, ma);
758
759         if (rc == 0 && ma->ma_need & MA_LOV) {
760                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
761                     S_ISDIR(mdd_object_type(mdd_obj)))
762                         rc = __mdd_lmm_get(env, mdd_obj, ma);
763         }
764         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
765                 if (S_ISREG(mdd_object_type(mdd_obj)))
766                         rc = mdd_pfid_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & MA_LMV) {
769                 if (S_ISDIR(mdd_object_type(mdd_obj)))
770                         rc = __mdd_lmv_get(env, mdd_obj, ma);
771         }
772         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
773                 if (S_ISREG(mdd_object_type(mdd_obj)))
774                         rc = __mdd_lma_get(env, mdd_obj, ma);
775         }
776 #ifdef CONFIG_FS_POSIX_ACL
777         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
778                 if (S_ISDIR(mdd_object_type(mdd_obj)))
779                         rc = mdd_def_acl_get(env, mdd_obj, ma);
780         }
781 #endif
782         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
783                rc, ma->ma_valid, ma->ma_lmm);
784         RETURN(rc);
785 }
786
787 int mdd_attr_get_internal_locked(const struct lu_env *env,
788                                  struct mdd_object *mdd_obj, struct md_attr *ma)
789 {
790         int rc;
791         int needlock = ma->ma_need &
792                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
793
794         if (needlock)
795                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
796         rc = mdd_attr_get_internal(env, mdd_obj, ma);
797         if (needlock)
798                 mdd_read_unlock(env, mdd_obj);
799         return rc;
800 }
801
802 /*
803  * No permission check is needed.
804  */
805 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
806                         struct md_attr *ma)
807 {
808         struct mdd_object *mdd_obj = md2mdd_obj(obj);
809         int                rc;
810
811         ENTRY;
812         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
813         RETURN(rc);
814 }
815
816 /*
817  * No permission check is needed.
818  */
819 static int mdd_xattr_get(const struct lu_env *env,
820                          struct md_object *obj, struct lu_buf *buf,
821                          const char *name)
822 {
823         struct mdd_object *mdd_obj = md2mdd_obj(obj);
824         int rc;
825
826         ENTRY;
827
828         if (mdd_object_exists(mdd_obj) == 0) {
829                 CERROR("%s: object "DFID" not found: rc = -2\n",
830                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
831                 return -ENOENT;
832         }
833
834         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835         rc = mdo_xattr_get(env, mdd_obj, buf, name,
836                            mdd_object_capa(env, mdd_obj));
837         mdd_read_unlock(env, mdd_obj);
838
839         RETURN(rc);
840 }
841
842 /*
843  * Permission check is done when open,
844  * no need check again.
845  */
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
847                         struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         struct dt_object  *next;
851         loff_t             pos = 0;
852         int                rc;
853         ENTRY;
854
855         if (mdd_object_exists(mdd_obj) == 0) {
856                 CERROR("%s: object "DFID" not found: rc = -2\n",
857                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
858                 return -ENOENT;
859         }
860
861         next = mdd_object_child(mdd_obj);
862         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
863         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
864                                          mdd_object_capa(env, mdd_obj));
865         mdd_read_unlock(env, mdd_obj);
866         RETURN(rc);
867 }
868
869 /*
870  * No permission check is needed.
871  */
872 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
873                           struct lu_buf *buf)
874 {
875         struct mdd_object *mdd_obj = md2mdd_obj(obj);
876         int rc;
877
878         ENTRY;
879
880         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
881         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
882         mdd_read_unlock(env, mdd_obj);
883
884         RETURN(rc);
885 }
886
887 int mdd_declare_object_create_internal(const struct lu_env *env,
888                                        struct mdd_object *p,
889                                        struct mdd_object *c,
890                                        struct md_attr *ma,
891                                        struct thandle *handle,
892                                        const struct md_op_spec *spec)
893 {
894         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
895         const struct dt_index_features *feat = spec->sp_feat;
896         int rc;
897         ENTRY;
898
899         if (feat != &dt_directory_features && feat != NULL)
900                 dof->dof_type = DFT_INDEX;
901         else
902                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
903
904         dof->u.dof_idx.di_feat = feat;
905
906         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
907
908         RETURN(rc);
909 }
910
911 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
912                                struct mdd_object *c, struct md_attr *ma,
913                                struct thandle *handle,
914                                const struct md_op_spec *spec)
915 {
916         struct lu_attr *attr = &ma->ma_attr;
917         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
918         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
919         const struct dt_index_features *feat = spec->sp_feat;
920         int rc;
921         ENTRY;
922
923         if (!mdd_object_exists(c)) {
924                 struct dt_object *next = mdd_object_child(c);
925                 LASSERT(next);
926
927                 if (feat != &dt_directory_features && feat != NULL)
928                         dof->dof_type = DFT_INDEX;
929                 else
930                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
931
932                 dof->u.dof_idx.di_feat = feat;
933
934                 /* @hint will be initialized by underlying device. */
935                 next->do_ops->do_ah_init(env, hint,
936                                          p ? mdd_object_child(p) : NULL,
937                                          attr->la_mode & S_IFMT);
938
939                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
940                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
941         } else
942                 rc = -EEXIST;
943
944         RETURN(rc);
945 }
946
947 /**
948  * Make sure the ctime is increased only.
949  */
950 static inline int mdd_attr_check(const struct lu_env *env,
951                                  struct mdd_object *obj,
952                                  struct lu_attr *attr)
953 {
954         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
955         int rc;
956         ENTRY;
957
958         if (attr->la_valid & LA_CTIME) {
959                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
960                 if (rc)
961                         RETURN(rc);
962
963                 if (attr->la_ctime < tmp_la->la_ctime)
964                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
965                 else if (attr->la_valid == LA_CTIME &&
966                          attr->la_ctime == tmp_la->la_ctime)
967                         attr->la_valid &= ~LA_CTIME;
968         }
969         RETURN(0);
970 }
971
972 int mdd_attr_set_internal(const struct lu_env *env,
973                           struct mdd_object *obj,
974                           struct lu_attr *attr,
975                           struct thandle *handle,
976                           int needacl)
977 {
978         int rc;
979         ENTRY;
980
981         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
982 #ifdef CONFIG_FS_POSIX_ACL
983         if (!rc && (attr->la_valid & LA_MODE) && needacl)
984                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
985 #endif
986         RETURN(rc);
987 }
988
989 int mdd_attr_check_set_internal(const struct lu_env *env,
990                                 struct mdd_object *obj,
991                                 struct lu_attr *attr,
992                                 struct thandle *handle,
993                                 int needacl)
994 {
995         int rc;
996         ENTRY;
997
998         rc = mdd_attr_check(env, obj, attr);
999         if (rc)
1000                 RETURN(rc);
1001
1002         if (attr->la_valid)
1003                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1004         RETURN(rc);
1005 }
1006
1007 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1008                                         struct mdd_object *obj,
1009                                         struct lu_attr *attr,
1010                                         struct thandle *handle,
1011                                         int needacl)
1012 {
1013         int rc;
1014         ENTRY;
1015
1016         needacl = needacl && (attr->la_valid & LA_MODE);
1017         if (needacl)
1018                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1019         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1020         if (needacl)
1021                 mdd_write_unlock(env, obj);
1022         RETURN(rc);
1023 }
1024
1025 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1026                                        struct mdd_object *obj,
1027                                        struct lu_attr *attr,
1028                                        struct thandle *handle,
1029                                        int needacl)
1030 {
1031         int rc;
1032         ENTRY;
1033
1034         needacl = needacl && (attr->la_valid & LA_MODE);
1035         if (needacl)
1036                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1037         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1038         if (needacl)
1039                 mdd_write_unlock(env, obj);
1040         RETURN(rc);
1041 }
1042
1043 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1044                     const struct lu_buf *buf, const char *name,
1045                     int fl, struct thandle *handle)
1046 {
1047         struct lustre_capa *capa = mdd_object_capa(env, obj);
1048         int rc = -EINVAL;
1049         ENTRY;
1050
1051         if (buf->lb_buf && buf->lb_len > 0)
1052                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1053         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1054                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1055
1056         RETURN(rc);
1057 }
1058
1059 /*
1060  * This gives the same functionality as the code between
1061  * sys_chmod and inode_setattr
1062  * chown_common and inode_setattr
1063  * utimes and inode_setattr
1064  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1065  */
1066 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1067                         struct lu_attr *la, const struct md_attr *ma)
1068 {
1069         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1070         struct md_ucred  *uc;
1071         int               rc;
1072         ENTRY;
1073
1074         if (!la->la_valid)
1075                 RETURN(0);
1076
1077         /* Do not permit change file type */
1078         if (la->la_valid & LA_TYPE)
1079                 RETURN(-EPERM);
1080
1081         /* They should not be processed by setattr */
1082         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1083                 RETURN(-EPERM);
1084
1085         /* export destroy does not have ->le_ses, but we may want
1086          * to drop LUSTRE_SOM_FL. */
1087         if (!env->le_ses)
1088                 RETURN(0);
1089
1090         uc = md_ucred(env);
1091
1092         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1093         if (rc)
1094                 RETURN(rc);
1095
1096         if (la->la_valid == LA_CTIME) {
1097                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1098                         /* This is only for set ctime when rename's source is
1099                          * on remote MDS. */
1100                         rc = mdd_may_delete(env, NULL, obj,
1101                                             (struct md_attr *)ma, 1, 0);
1102                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1103                         la->la_valid &= ~LA_CTIME;
1104                 RETURN(rc);
1105         }
1106
1107         if (la->la_valid == LA_ATIME) {
1108                 /* This is atime only set for read atime update on close. */
1109                 if (la->la_atime >= tmp_la->la_atime &&
1110                     la->la_atime < (tmp_la->la_atime +
1111                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1112                         la->la_valid &= ~LA_ATIME;
1113                 RETURN(0);
1114         }
1115
1116         /* Check if flags change. */
1117         if (la->la_valid & LA_FLAGS) {
1118                 unsigned int oldflags = 0;
1119                 unsigned int newflags = la->la_flags &
1120                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1121
1122                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1123                     !mdd_capable(uc, CFS_CAP_FOWNER))
1124                         RETURN(-EPERM);
1125
1126                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1127                  * only be changed by the relevant capability. */
1128                 if (mdd_is_immutable(obj))
1129                         oldflags |= LUSTRE_IMMUTABLE_FL;
1130                 if (mdd_is_append(obj))
1131                         oldflags |= LUSTRE_APPEND_FL;
1132                 if ((oldflags ^ newflags) &&
1133                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1134                         RETURN(-EPERM);
1135
1136                 if (!S_ISDIR(tmp_la->la_mode))
1137                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1138         }
1139
1140         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1141             (la->la_valid & ~LA_FLAGS) &&
1142             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1143                 RETURN(-EPERM);
1144
1145         /* Check for setting the obj time. */
1146         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1147             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1148                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1149                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1150                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1151                                                             MAY_WRITE,
1152                                                             MOR_TGT_CHILD);
1153                         if (rc)
1154                                 RETURN(rc);
1155                 }
1156         }
1157
1158         if (la->la_valid & LA_KILL_SUID) {
1159                 la->la_valid &= ~LA_KILL_SUID;
1160                 if ((tmp_la->la_mode & S_ISUID) &&
1161                     !(la->la_valid & LA_MODE)) {
1162                         la->la_mode = tmp_la->la_mode;
1163                         la->la_valid |= LA_MODE;
1164                 }
1165                 la->la_mode &= ~S_ISUID;
1166         }
1167
1168         if (la->la_valid & LA_KILL_SGID) {
1169                 la->la_valid &= ~LA_KILL_SGID;
1170                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1171                                         (S_ISGID | S_IXGRP)) &&
1172                     !(la->la_valid & LA_MODE)) {
1173                         la->la_mode = tmp_la->la_mode;
1174                         la->la_valid |= LA_MODE;
1175                 }
1176                 la->la_mode &= ~S_ISGID;
1177         }
1178
1179         /* Make sure a caller can chmod. */
1180         if (la->la_valid & LA_MODE) {
1181                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1182                     (uc->mu_fsuid != tmp_la->la_uid) &&
1183                     !mdd_capable(uc, CFS_CAP_FOWNER))
1184                         RETURN(-EPERM);
1185
1186                 if (la->la_mode == (cfs_umode_t) -1)
1187                         la->la_mode = tmp_la->la_mode;
1188                 else
1189                         la->la_mode = (la->la_mode & S_IALLUGO) |
1190                                       (tmp_la->la_mode & ~S_IALLUGO);
1191
1192                 /* Also check the setgid bit! */
1193                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1194                                        la->la_gid : tmp_la->la_gid) &&
1195                     !mdd_capable(uc, CFS_CAP_FSETID))
1196                         la->la_mode &= ~S_ISGID;
1197         } else {
1198                la->la_mode = tmp_la->la_mode;
1199         }
1200
1201         /* Make sure a caller can chown. */
1202         if (la->la_valid & LA_UID) {
1203                 if (la->la_uid == (uid_t) -1)
1204                         la->la_uid = tmp_la->la_uid;
1205                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1206                     (la->la_uid != tmp_la->la_uid)) &&
1207                     !mdd_capable(uc, CFS_CAP_CHOWN))
1208                         RETURN(-EPERM);
1209
1210                 /* If the user or group of a non-directory has been
1211                  * changed by a non-root user, remove the setuid bit.
1212                  * 19981026 David C Niemi <niemi@tux.org>
1213                  *
1214                  * Changed this to apply to all users, including root,
1215                  * to avoid some races. This is the behavior we had in
1216                  * 2.0. The check for non-root was definitely wrong
1217                  * for 2.2 anyway, as it should have been using
1218                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1219                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1220                     !S_ISDIR(tmp_la->la_mode)) {
1221                         la->la_mode &= ~S_ISUID;
1222                         la->la_valid |= LA_MODE;
1223                 }
1224         }
1225
1226         /* Make sure caller can chgrp. */
1227         if (la->la_valid & LA_GID) {
1228                 if (la->la_gid == (gid_t) -1)
1229                         la->la_gid = tmp_la->la_gid;
1230                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1231                     ((la->la_gid != tmp_la->la_gid) &&
1232                     !lustre_in_group_p(uc, la->la_gid))) &&
1233                     !mdd_capable(uc, CFS_CAP_CHOWN))
1234                         RETURN(-EPERM);
1235
1236                 /* Likewise, if the user or group of a non-directory
1237                  * has been changed by a non-root user, remove the
1238                  * setgid bit UNLESS there is no group execute bit
1239                  * (this would be a file marked for mandatory
1240                  * locking).  19981026 David C Niemi <niemi@tux.org>
1241                  *
1242                  * Removed the fsuid check (see the comment above) --
1243                  * 19990830 SD. */
1244                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1245                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1246                         la->la_mode &= ~S_ISGID;
1247                         la->la_valid |= LA_MODE;
1248                 }
1249         }
1250
1251         /* For both Size-on-MDS case and truncate case,
1252          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1253          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1254          * For SOM case, it is true, the MAY_WRITE perm has been checked
1255          * when open, no need check again. For truncate case, it is false,
1256          * the MAY_WRITE perm should be checked here. */
1257         if (ma->ma_attr_flags & MDS_SOM) {
1258                 /* For the "Size-on-MDS" setattr update, merge coming
1259                  * attributes with the set in the inode. BUG 10641 */
1260                 if ((la->la_valid & LA_ATIME) &&
1261                     (la->la_atime <= tmp_la->la_atime))
1262                         la->la_valid &= ~LA_ATIME;
1263
1264                 /* OST attributes do not have a priority over MDS attributes,
1265                  * so drop times if ctime is equal. */
1266                 if ((la->la_valid & LA_CTIME) &&
1267                     (la->la_ctime <= tmp_la->la_ctime))
1268                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1269         } else {
1270                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1271                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1272                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1273                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1274                                 rc = mdd_permission_internal_locked(env, obj,
1275                                                             tmp_la, MAY_WRITE,
1276                                                             MOR_TGT_CHILD);
1277                                 if (rc)
1278                                         RETURN(rc);
1279                         }
1280                 }
1281                 if (la->la_valid & LA_CTIME) {
1282                         /* The pure setattr, it has the priority over what is
1283                          * already set, do not drop it if ctime is equal. */
1284                         if (la->la_ctime < tmp_la->la_ctime)
1285                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1286                                                   LA_CTIME);
1287                 }
1288         }
1289
1290         RETURN(0);
1291 }
1292
1293 /** Store a data change changelog record
1294  * If this fails, we must fail the whole transaction; we don't
1295  * want the change to commit without the log entry.
1296  * \param mdd_obj - mdd_object of change
1297  * \param handle - transacion handle
1298  */
1299 static int mdd_changelog_data_store(const struct lu_env     *env,
1300                                     struct mdd_device       *mdd,
1301                                     enum changelog_rec_type type,
1302                                     int                     flags,
1303                                     struct mdd_object       *mdd_obj,
1304                                     struct thandle          *handle)
1305 {
1306         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1307         struct llog_changelog_rec *rec;
1308         struct thandle *th = NULL;
1309         struct lu_buf *buf;
1310         int reclen;
1311         int rc;
1312
1313         /* Not recording */
1314         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1315                 RETURN(0);
1316         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1317                 RETURN(0);
1318
1319         LASSERT(mdd_obj != NULL);
1320         LASSERT(handle != NULL);
1321
1322         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1323             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1324                 /* Don't need multiple updates in this log */
1325                 /* Don't check under lock - no big deal if we get an extra
1326                    entry */
1327                 RETURN(0);
1328         }
1329
1330         reclen = llog_data_len(sizeof(*rec));
1331         buf = mdd_buf_alloc(env, reclen);
1332         if (buf->lb_buf == NULL)
1333                 RETURN(-ENOMEM);
1334         rec = (struct llog_changelog_rec *)buf->lb_buf;
1335
1336         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1337         rec->cr.cr_type = (__u32)type;
1338         rec->cr.cr_tfid = *tfid;
1339         rec->cr.cr_namelen = 0;
1340         mdd_obj->mod_cltime = cfs_time_current_64();
1341
1342         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1343
1344         if (th)
1345                 mdd_trans_stop(env, mdd, rc, th);
1346
1347         if (rc < 0) {
1348                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1349                        rc, type, PFID(tfid));
1350                 return -EFAULT;
1351         }
1352
1353         return 0;
1354 }
1355
1356 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1357                   int flags, struct md_object *obj)
1358 {
1359         struct thandle *handle;
1360         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1361         struct mdd_device *mdd = mdo2mdd(obj);
1362         int rc;
1363         ENTRY;
1364
1365         handle = mdd_trans_create(env, mdd);
1366         if (IS_ERR(handle))
1367                 return(PTR_ERR(handle));
1368
1369         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1370         if (rc)
1371                 GOTO(stop, rc);
1372
1373         rc = mdd_trans_start(env, mdd, handle);
1374         if (rc)
1375                 GOTO(stop, rc);
1376
1377         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1378                                       handle);
1379
1380 stop:
1381         mdd_trans_stop(env, mdd, rc, handle);
1382
1383         RETURN(rc);
1384 }
1385
1386 /**
1387  * Should be called with write lock held.
1388  *
1389  * \see mdd_lma_set_locked().
1390  */
1391 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1392                        const struct md_attr *ma, struct thandle *handle)
1393 {
1394         struct mdd_thread_info *info = mdd_env_info(env);
1395         struct lu_buf *buf;
1396         struct lustre_mdt_attrs *lma =
1397                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1398         int lmasize = sizeof(struct lustre_mdt_attrs);
1399         int rc = 0;
1400
1401         ENTRY;
1402
1403         /* Either HSM or SOM part is not valid, we need to read it before */
1404         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1405                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1406                 if (rc <= 0)
1407                         RETURN(rc);
1408
1409                 lustre_lma_swab(lma);
1410         } else {
1411                 memset(lma, 0, lmasize);
1412         }
1413
1414         /* Copy HSM data */
1415         if (ma->ma_valid & MA_HSM) {
1416                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1417                 lma->lma_compat |= LMAC_HSM;
1418         }
1419
1420         /* Copy SOM data */
1421         if (ma->ma_valid & MA_SOM) {
1422                 LASSERT(ma->ma_som != NULL);
1423                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1424                         lma->lma_compat     &= ~LMAC_SOM;
1425                 } else {
1426                         lma->lma_compat     |= LMAC_SOM;
1427                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1428                         lma->lma_som_size    = ma->ma_som->msd_size;
1429                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1430                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1431                 }
1432         }
1433
1434         /* Copy FID */
1435         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1436
1437         lustre_lma_swab(lma);
1438         buf = mdd_buf_get(env, lma, lmasize);
1439         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1440
1441         RETURN(rc);
1442 }
1443
1444 /**
1445  * Save LMA extended attributes with data from \a ma.
1446  *
1447  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1448  * not, LMA EA will be first read from disk, modified and write back.
1449  *
1450  */
1451 static int mdd_lma_set_locked(const struct lu_env *env,
1452                               struct mdd_object *mdd_obj,
1453                               const struct md_attr *ma, struct thandle *handle)
1454 {
1455         int rc;
1456
1457         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1458         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1459         mdd_write_unlock(env, mdd_obj);
1460         return rc;
1461 }
1462
1463 /* Precedence for choosing record type when multiple
1464  * attributes change: setattr > mtime > ctime > atime
1465  * (ctime changes when mtime does, plus chmod/chown.
1466  * atime and ctime are independent.) */
1467 static int mdd_attr_set_changelog(const struct lu_env *env,
1468                                   struct md_object *obj, struct thandle *handle,
1469                                   __u64 valid)
1470 {
1471         struct mdd_device *mdd = mdo2mdd(obj);
1472         int bits, type = 0;
1473
1474         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1475         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1476         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1477         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1478         bits = bits & mdd->mdd_cl.mc_mask;
1479         if (bits == 0)
1480                 return 0;
1481
1482         /* The record type is the lowest non-masked set bit */
1483         while (bits && ((bits & 1) == 0)) {
1484                 bits = bits >> 1;
1485                 type++;
1486         }
1487
1488         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1489         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1490                                         md2mdd_obj(obj), handle);
1491 }
1492
1493 static int mdd_declare_attr_set(const struct lu_env *env,
1494                                 struct mdd_device *mdd,
1495                                 struct mdd_object *obj,
1496                                 const struct md_attr *ma,
1497                                 struct lov_mds_md *lmm,
1498                                 struct thandle *handle)
1499 {
1500         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1501         int             rc, i;
1502
1503         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1504         if (rc)
1505                 return rc;
1506
1507         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1508         if (rc)
1509                 return rc;
1510
1511         if (ma->ma_valid & MA_LOV) {
1512                 buf->lb_buf = NULL;
1513                 buf->lb_len = ma->ma_lmm_size;
1514                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1515                                            0, handle);
1516                 if (rc)
1517                         return rc;
1518         }
1519
1520         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1521                 buf->lb_buf = NULL;
1522                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1523                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1524                                            0, handle);
1525                 if (rc)
1526                         return rc;
1527         }
1528
1529 #ifdef CONFIG_FS_POSIX_ACL
1530         if (ma->ma_attr.la_valid & LA_MODE) {
1531                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1532                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1533                                    BYPASS_CAPA);
1534                 mdd_read_unlock(env, obj);
1535                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1536                         rc = 0;
1537                 else if (rc < 0)
1538                         return rc;
1539
1540                 if (rc != 0) {
1541                         buf->lb_buf = NULL;
1542                         buf->lb_len = rc;
1543                         rc = mdo_declare_xattr_set(env, obj, buf,
1544                                                    XATTR_NAME_ACL_ACCESS, 0,
1545                                                    handle);
1546                         if (rc)
1547                                 return rc;
1548                 }
1549         }
1550 #endif
1551
1552         /* basically the log is the same as in unlink case */
1553         if (lmm) {
1554                 __u16 stripe;
1555
1556                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1557                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1558                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1559                                mdd->mdd_obd_dev->obd_name,
1560                                le32_to_cpu(lmm->lmm_magic),
1561                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1562                         return -EINVAL;
1563                 }
1564
1565                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1566                 if (stripe == LOV_ALL_STRIPES) {
1567                         struct lov_desc *ldesc;
1568
1569                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1570                         LASSERT(ldesc != NULL);
1571                         stripe = ldesc->ld_tgt_count;
1572                 }
1573
1574                 for (i = 0; i < stripe; i++) {
1575                         rc = mdd_declare_llog_record(env, mdd,
1576                                         sizeof(struct llog_unlink_rec),
1577                                         handle);
1578                         if (rc)
1579                                 return rc;
1580                 }
1581         }
1582
1583         return rc;
1584 }
1585
1586 /* set attr and LOV EA at once, return updated attr */
1587 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1588                         const struct md_attr *ma)
1589 {
1590         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1591         struct mdd_device *mdd = mdo2mdd(obj);
1592         struct thandle *handle;
1593         struct lov_mds_md *lmm = NULL;
1594         struct llog_cookie *logcookies = NULL;
1595         int  rc, lmm_size = 0, cookie_size = 0;
1596         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1597         struct obd_device *obd = mdd->mdd_obd_dev;
1598         struct mds_obd *mds = &obd->u.mds;
1599 #ifdef HAVE_QUOTA_SUPPORT
1600         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1601         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1602         int quota_opc = 0, block_count = 0;
1603         int inode_pending[MAXQUOTAS] = { 0, 0 };
1604         int block_pending[MAXQUOTAS] = { 0, 0 };
1605 #endif
1606         ENTRY;
1607
1608         *la_copy = ma->ma_attr;
1609         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1610         if (rc != 0)
1611                 RETURN(rc);
1612
1613         /* setattr on "close" only change atime, or do nothing */
1614         if (ma->ma_valid == MA_INODE &&
1615             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1616                 RETURN(0);
1617
1618         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1619             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1620                 lmm_size = mdd_lov_mdsize(env, mdd);
1621                 lmm = mdd_max_lmm_get(env, mdd);
1622                 if (lmm == NULL)
1623                         RETURN(-ENOMEM);
1624
1625                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1626                                 XATTR_NAME_LOV);
1627
1628                 if (rc < 0)
1629                         RETURN(rc);
1630         }
1631
1632         handle = mdd_trans_create(env, mdd);
1633         if (IS_ERR(handle))
1634                 RETURN(PTR_ERR(handle));
1635
1636         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1637                                   lmm_size > 0 ? lmm : NULL, handle);
1638         if (rc)
1639                 GOTO(stop, rc);
1640
1641         /* permission changes may require sync operation */
1642         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1643                 handle->th_sync = !!mdd->mdd_sync_permission;
1644
1645         rc = mdd_trans_start(env, mdd, handle);
1646         if (rc)
1647                 GOTO(stop, rc);
1648
1649         /* permission changes may require sync operation */
1650         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1651                 handle->th_sync |= mdd->mdd_sync_permission;
1652
1653         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1654                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1655                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1656
1657 #ifdef HAVE_QUOTA_SUPPORT
1658         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1659                 struct obd_export *exp = md_quota(env)->mq_exp;
1660                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1661
1662                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1663                 if (!rc) {
1664                         quota_opc = FSFILT_OP_SETATTR;
1665                         mdd_quota_wrapper(la_copy, qnids);
1666                         mdd_quota_wrapper(la_tmp, qoids);
1667                         /* get file quota for new owner */
1668                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1669                                         qnids, inode_pending, 1, NULL, 0,
1670                                         NULL, 0);
1671                         block_count = (la_tmp->la_blocks + 7) >> 3;
1672                         if (block_count) {
1673                                 void *data = NULL;
1674                                 mdd_data_get(env, mdd_obj, &data);
1675                                 /* get block quota for new owner */
1676                                 lquota_chkquota(mds_quota_interface_ref, obd,
1677                                                 exp, qnids, block_pending,
1678                                                 block_count, NULL,
1679                                                 LQUOTA_FLAGS_BLK, data, 1);
1680                         }
1681                 }
1682         }
1683 #endif
1684
1685         if (la_copy->la_valid & LA_FLAGS) {
1686                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1687                                                   handle, 1);
1688                 if (rc == 0)
1689                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1690         } else if (la_copy->la_valid) {            /* setattr */
1691                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1692                                                   handle, 1);
1693                 /* journal chown/chgrp in llog, just like unlink */
1694                 if (rc == 0 && lmm_size){
1695                         cookie_size = mdd_lov_cookiesize(env, mdd);
1696                         logcookies = mdd_max_cookie_get(env, mdd);
1697                         if (logcookies == NULL)
1698                                 GOTO(cleanup, rc = -ENOMEM);
1699
1700                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1701                                             logcookies, cookie_size) <= 0)
1702                                 logcookies = NULL;
1703                 }
1704         }
1705
1706         if (rc == 0 && ma->ma_valid & MA_LOV) {
1707                 cfs_umode_t mode;
1708
1709                 mode = mdd_object_type(mdd_obj);
1710                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1711                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1712                         if (rc)
1713                                 GOTO(cleanup, rc);
1714
1715                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1716                                             ma->ma_lmm_size, handle, 1);
1717                 }
1718
1719         }
1720         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1721                 cfs_umode_t mode;
1722
1723                 mode = mdd_object_type(mdd_obj);
1724                 if (S_ISREG(mode))
1725                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1726
1727         }
1728 cleanup:
1729         if (rc == 0)
1730                 rc = mdd_attr_set_changelog(env, obj, handle,
1731                                             ma->ma_attr.la_valid);
1732 stop:
1733         mdd_trans_stop(env, mdd, rc, handle);
1734         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1735                 /*set obd attr, if needed*/
1736                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1737                                            logcookies);
1738         }
1739 #ifdef HAVE_QUOTA_SUPPORT
1740         if (quota_opc) {
1741                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1742                                       inode_pending, 0);
1743                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1744                                       block_pending, 1);
1745                 /* Trigger dqrel/dqacq for original owner and new owner.
1746                  * If failed, the next call for lquota_chkquota will
1747                  * process it. */
1748                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1749                               quota_opc);
1750         }
1751 #endif
1752         RETURN(rc);
1753 }
1754
1755 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1756                       const struct lu_buf *buf, const char *name, int fl,
1757                       struct thandle *handle)
1758 {
1759         int  rc;
1760         ENTRY;
1761
1762         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1763         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1764         mdd_write_unlock(env, obj);
1765
1766         RETURN(rc);
1767 }
1768
1769 static int mdd_xattr_sanity_check(const struct lu_env *env,
1770                                   struct mdd_object *obj)
1771 {
1772         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1773         struct md_ucred *uc     = md_ucred(env);
1774         int rc;
1775         ENTRY;
1776
1777         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1778                 RETURN(-EPERM);
1779
1780         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1781         if (rc)
1782                 RETURN(rc);
1783
1784         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1785             !mdd_capable(uc, CFS_CAP_FOWNER))
1786                 RETURN(-EPERM);
1787
1788         RETURN(rc);
1789 }
1790
1791 static int mdd_declare_xattr_set(const struct lu_env *env,
1792                                  struct mdd_device *mdd,
1793                                  struct mdd_object *obj,
1794                                  const struct lu_buf *buf,
1795                                  const char *name,
1796                                  struct thandle *handle)
1797
1798 {
1799         int rc;
1800
1801         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1802         if (rc)
1803                 return rc;
1804
1805         /* Only record user xattr changes */
1806         if ((strncmp("user.", name, 5) == 0))
1807                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1808
1809         return rc;
1810 }
1811
1812 /**
1813  * The caller should guarantee to update the object ctime
1814  * after xattr_set if needed.
1815  */
1816 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1817                          const struct lu_buf *buf, const char *name,
1818                          int fl)
1819 {
1820         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1821         struct mdd_device *mdd = mdo2mdd(obj);
1822         struct thandle *handle;
1823         int  rc;
1824         ENTRY;
1825
1826         rc = mdd_xattr_sanity_check(env, mdd_obj);
1827         if (rc)
1828                 RETURN(rc);
1829
1830         handle = mdd_trans_create(env, mdd);
1831         if (IS_ERR(handle))
1832                 RETURN(PTR_ERR(handle));
1833
1834         /* security-replated changes may require sync */
1835         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1836             mdd->mdd_sync_permission == 1)
1837                 handle->th_sync = 1;
1838
1839         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1840         if (rc)
1841                 GOTO(stop, rc);
1842
1843         rc = mdd_trans_start(env, mdd, handle);
1844         if (rc)
1845                 GOTO(stop, rc);
1846
1847         /* security-replated changes may require sync */
1848         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1849                 handle->th_sync |= mdd->mdd_sync_permission;
1850
1851         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1852
1853         /* Only record system & user xattr changes */
1854         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1855                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1856                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1857                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1858                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1859                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1860                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1861                                               handle);
1862
1863 stop:
1864         mdd_trans_stop(env, mdd, rc, handle);
1865
1866         RETURN(rc);
1867 }
1868
1869 static int mdd_declare_xattr_del(const struct lu_env *env,
1870                                  struct mdd_device *mdd,
1871                                  struct mdd_object *obj,
1872                                  const char *name,
1873                                  struct thandle *handle)
1874 {
1875         int rc;
1876
1877         rc = mdo_declare_xattr_del(env, obj, name, handle);
1878         if (rc)
1879                 return rc;
1880
1881         /* Only record user xattr changes */
1882         if ((strncmp("user.", name, 5) == 0))
1883                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1884
1885         return rc;
1886 }
1887
1888 /**
1889  * The caller should guarantee to update the object ctime
1890  * after xattr_set if needed.
1891  */
1892 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1893                   const char *name)
1894 {
1895         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1896         struct mdd_device *mdd = mdo2mdd(obj);
1897         struct thandle *handle;
1898         int  rc;
1899         ENTRY;
1900
1901         rc = mdd_xattr_sanity_check(env, mdd_obj);
1902         if (rc)
1903                 RETURN(rc);
1904
1905         handle = mdd_trans_create(env, mdd);
1906         if (IS_ERR(handle))
1907                 RETURN(PTR_ERR(handle));
1908
1909         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1910         if (rc)
1911                 GOTO(stop, rc);
1912
1913         rc = mdd_trans_start(env, mdd, handle);
1914         if (rc)
1915                 GOTO(stop, rc);
1916
1917         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1918         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1919                            mdd_object_capa(env, mdd_obj));
1920         mdd_write_unlock(env, mdd_obj);
1921
1922         /* Only record system & user xattr changes */
1923         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1924                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1925                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1926                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1927                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1928                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1929                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1930                                               handle);
1931
1932 stop:
1933         mdd_trans_stop(env, mdd, rc, handle);
1934
1935         RETURN(rc);
1936 }
1937
1938 /* partial unlink */
1939 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1940                        struct md_attr *ma)
1941 {
1942         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1943         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1944         struct mdd_device *mdd = mdo2mdd(obj);
1945         struct thandle *handle;
1946 #ifdef HAVE_QUOTA_SUPPORT
1947         struct obd_device *obd = mdd->mdd_obd_dev;
1948         struct mds_obd *mds = &obd->u.mds;
1949         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1950         int quota_opc = 0;
1951 #endif
1952         int rc;
1953         ENTRY;
1954
1955         /* XXX: this code won't be used ever:
1956          * DNE uses slightly different approach */
1957         LBUG();
1958
1959         /*
1960          * Check -ENOENT early here because we need to get object type
1961          * to calculate credits before transaction start
1962          */
1963         if (mdd_object_exists(mdd_obj) == 0) {
1964                 CERROR("%s: object "DFID" not found: rc = -2\n",
1965                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1966                 RETURN(-ENOENT);
1967         }
1968
1969         LASSERT(mdd_object_exists(mdd_obj) > 0);
1970
1971         handle = mdd_trans_create(env, mdd);
1972         if (IS_ERR(handle))
1973                 RETURN(-ENOMEM);
1974
1975         rc = mdd_trans_start(env, mdd, handle);
1976
1977         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1978
1979         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1980         if (rc)
1981                 GOTO(cleanup, rc);
1982
1983         mdo_ref_del(env, mdd_obj, handle);
1984
1985         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1986                 /* unlink dot */
1987                 mdo_ref_del(env, mdd_obj, handle);
1988         }
1989
1990         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1991         la_copy->la_ctime = ma->ma_attr.la_ctime;
1992
1993         la_copy->la_valid = LA_CTIME;
1994         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1995         if (rc)
1996                 GOTO(cleanup, rc);
1997
1998         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1999 #ifdef HAVE_QUOTA_SUPPORT
2000         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2001             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2002                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2003                 mdd_quota_wrapper(&ma->ma_attr, qids);
2004         }
2005 #endif
2006
2007
2008         EXIT;
2009 cleanup:
2010         mdd_write_unlock(env, mdd_obj);
2011         mdd_trans_stop(env, mdd, rc, handle);
2012 #ifdef HAVE_QUOTA_SUPPORT
2013         if (quota_opc)
2014                 /* Trigger dqrel on the owner of child. If failed,
2015                  * the next call for lquota_chkquota will process it */
2016                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2017                               quota_opc);
2018 #endif
2019         return rc;
2020 }
2021
2022 /* partial operation */
2023 static int mdd_oc_sanity_check(const struct lu_env *env,
2024                                struct mdd_object *obj,
2025                                struct md_attr *ma)
2026 {
2027         int rc;
2028         ENTRY;
2029
2030         switch (ma->ma_attr.la_mode & S_IFMT) {
2031         case S_IFREG:
2032         case S_IFDIR:
2033         case S_IFLNK:
2034         case S_IFCHR:
2035         case S_IFBLK:
2036         case S_IFIFO:
2037         case S_IFSOCK:
2038                 rc = 0;
2039                 break;
2040         default:
2041                 rc = -EINVAL;
2042                 break;
2043         }
2044         RETURN(rc);
2045 }
2046
2047 static int mdd_object_create(const struct lu_env *env,
2048                              struct md_object *obj,
2049                              const struct md_op_spec *spec,
2050                              struct md_attr *ma)
2051 {
2052
2053         struct mdd_device *mdd = mdo2mdd(obj);
2054         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2055         const struct lu_fid *pfid = spec->u.sp_pfid;
2056         struct thandle *handle;
2057 #ifdef HAVE_QUOTA_SUPPORT
2058         struct obd_device *obd = mdd->mdd_obd_dev;
2059         struct obd_export *exp = md_quota(env)->mq_exp;
2060         struct mds_obd *mds = &obd->u.mds;
2061         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2062         int quota_opc = 0, block_count = 0;
2063         int inode_pending[MAXQUOTAS] = { 0, 0 };
2064         int block_pending[MAXQUOTAS] = { 0, 0 };
2065 #endif
2066         int rc = 0;
2067         ENTRY;
2068
2069         /* XXX: this code won't be used ever:
2070          * DNE uses slightly different approach */
2071         LBUG();
2072
2073 #ifdef HAVE_QUOTA_SUPPORT
2074         if (mds->mds_quota) {
2075                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2076                 mdd_quota_wrapper(&ma->ma_attr, qids);
2077                 /* get file quota for child */
2078                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2079                                 qids, inode_pending, 1, NULL, 0,
2080                                 NULL, 0);
2081                 switch (ma->ma_attr.la_mode & S_IFMT) {
2082                 case S_IFLNK:
2083                 case S_IFDIR:
2084                         block_count = 2;
2085                         break;
2086                 case S_IFREG:
2087                         block_count = 1;
2088                         break;
2089                 }
2090                 /* get block quota for child */
2091                 if (block_count)
2092                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2093                                         qids, block_pending, block_count,
2094                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2095         }
2096 #endif
2097
2098         handle = mdd_trans_create(env, mdd);
2099         if (IS_ERR(handle))
2100                 GOTO(out_pending, rc = PTR_ERR(handle));
2101
2102         rc = mdd_trans_start(env, mdd, handle);
2103
2104         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2105         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2106         if (rc)
2107                 GOTO(unlock, rc);
2108
2109         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2110         if (rc)
2111                 GOTO(unlock, rc);
2112
2113         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2114                 /* If creating the slave object, set slave EA here. */
2115                 int lmv_size = spec->u.sp_ea.eadatalen;
2116                 struct lmv_stripe_md *lmv;
2117
2118                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2119                 LASSERT(lmv != NULL && lmv_size > 0);
2120
2121                 rc = __mdd_xattr_set(env, mdd_obj,
2122                                      mdd_buf_get_const(env, lmv, lmv_size),
2123                                      XATTR_NAME_LMV, 0, handle);
2124                 if (rc)
2125                         GOTO(unlock, rc);
2126
2127                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2128                                            handle, 0);
2129         } else {
2130 #ifdef CONFIG_FS_POSIX_ACL
2131                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2132                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2133
2134                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2135                         buf->lb_len = spec->u.sp_ea.eadatalen;
2136                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2137                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2138                                                     &ma->ma_attr.la_mode,
2139                                                     handle);
2140                                 if (rc)
2141                                         GOTO(unlock, rc);
2142                                 else
2143                                         ma->ma_attr.la_valid |= LA_MODE;
2144                         }
2145
2146                         pfid = spec->u.sp_ea.fid;
2147                 }
2148 #endif
2149                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2150                                            spec);
2151         }
2152         EXIT;
2153 unlock:
2154         if (rc == 0)
2155                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2156         mdd_write_unlock(env, mdd_obj);
2157
2158         mdd_trans_stop(env, mdd, rc, handle);
2159 out_pending:
2160 #ifdef HAVE_QUOTA_SUPPORT
2161         if (quota_opc) {
2162                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2163                                       inode_pending, 0);
2164                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2165                                       block_pending, 1);
2166                 /* Trigger dqacq on the owner of child. If failed,
2167                  * the next call for lquota_chkquota will process it. */
2168                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2169                               quota_opc);
2170         }
2171 #endif
2172         return rc;
2173 }
2174
2175 /* partial link */
2176 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2177                        const struct md_attr *ma)
2178 {
2179         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2180         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2181         struct mdd_device *mdd = mdo2mdd(obj);
2182         struct thandle *handle;
2183         int rc;
2184         ENTRY;
2185
2186         /* XXX: this code won't be used ever:
2187          * DNE uses slightly different approach */
2188         LBUG();
2189
2190         handle = mdd_trans_create(env, mdd);
2191         if (IS_ERR(handle))
2192                 RETURN(-ENOMEM);
2193
2194         rc = mdd_trans_start(env, mdd, handle);
2195
2196         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2197         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2198         if (rc == 0)
2199                 mdo_ref_add(env, mdd_obj, handle);
2200         mdd_write_unlock(env, mdd_obj);
2201         if (rc == 0) {
2202                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2203                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2204
2205                 la_copy->la_valid = LA_CTIME;
2206                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2207                                                         handle, 0);
2208         }
2209         mdd_trans_stop(env, mdd, 0, handle);
2210
2211         RETURN(rc);
2212 }
2213
2214 /*
2215  * do NOT or the MAY_*'s, you'll get the weakest
2216  */
2217 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2218 {
2219         int res = 0;
2220
2221         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2222          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2223          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2224          * owner can write to a file even if it is marked readonly to hide
2225          * its brokenness. (bug 5781) */
2226         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2227                 struct md_ucred *uc = md_ucred(env);
2228
2229                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2230                     (la->la_uid == uc->mu_fsuid))
2231                         return 0;
2232         }
2233
2234         if (flags & FMODE_READ)
2235                 res |= MAY_READ;
2236         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2237                 res |= MAY_WRITE;
2238         if (flags & MDS_FMODE_EXEC)
2239                 res = MAY_EXEC;
2240         return res;
2241 }
2242
2243 static int mdd_open_sanity_check(const struct lu_env *env,
2244                                  struct mdd_object *obj, int flag)
2245 {
2246         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2247         int mode, rc;
2248         ENTRY;
2249
2250         /* EEXIST check */
2251         if (mdd_is_dead_obj(obj))
2252                 RETURN(-ENOENT);
2253
2254         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2255         if (rc)
2256                RETURN(rc);
2257
2258         if (S_ISLNK(tmp_la->la_mode))
2259                 RETURN(-ELOOP);
2260
2261         mode = accmode(env, tmp_la, flag);
2262
2263         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2264                 RETURN(-EISDIR);
2265
2266         if (!(flag & MDS_OPEN_CREATED)) {
2267                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2268                 if (rc)
2269                         RETURN(rc);
2270         }
2271
2272         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2273             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2274                 flag &= ~MDS_OPEN_TRUNC;
2275
2276         /* For writing append-only file must open it with append mode. */
2277         if (mdd_is_append(obj)) {
2278                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2279                         RETURN(-EPERM);
2280                 if (flag & MDS_OPEN_TRUNC)
2281                         RETURN(-EPERM);
2282         }
2283
2284 #if 0
2285         /*
2286          * Now, flag -- O_NOATIME does not be packed by client.
2287          */
2288         if (flag & O_NOATIME) {
2289                 struct md_ucred *uc = md_ucred(env);
2290
2291                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2292                     (uc->mu_valid == UCRED_NEW)) &&
2293                     (uc->mu_fsuid != tmp_la->la_uid) &&
2294                     !mdd_capable(uc, CFS_CAP_FOWNER))
2295                         RETURN(-EPERM);
2296         }
2297 #endif
2298
2299         RETURN(0);
2300 }
2301
2302 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2303                     int flags)
2304 {
2305         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2306         int rc = 0;
2307
2308         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2309
2310         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2311         if (rc == 0)
2312                 mdd_obj->mod_count++;
2313
2314         mdd_write_unlock(env, mdd_obj);
2315         return rc;
2316 }
2317
2318 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2319                             struct md_attr *ma, struct thandle *handle)
2320 {
2321         int rc;
2322
2323         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2324         if (rc)
2325                 return rc;
2326
2327         return mdo_declare_destroy(env, obj, handle);
2328 }
2329
2330 /* return md_attr back,
2331  * if it is last unlink then return lov ea + llog cookie*/
2332 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2333                     struct md_attr *ma, struct thandle *handle)
2334 {
2335         int rc = 0;
2336         ENTRY;
2337
2338         if (S_ISREG(mdd_object_type(obj))) {
2339                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2340                  * Caller must be ready for that. */
2341
2342                 rc = __mdd_lmm_get(env, obj, ma);
2343                 if ((ma->ma_valid & MA_LOV))
2344                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2345                                             obj, ma);
2346         }
2347
2348         if (rc == 0)
2349                 rc = mdo_destroy(env, obj, handle);
2350
2351         RETURN(rc);
2352 }
2353
2354 static int mdd_declare_close(const struct lu_env *env,
2355                              struct mdd_object *obj,
2356                              struct md_attr *ma,
2357                              struct thandle *handle)
2358 {
2359         int rc;
2360
2361         rc = orph_declare_index_delete(env, obj, handle);
2362         if (rc)
2363                 return rc;
2364
2365         return mdd_declare_object_kill(env, obj, ma, handle);
2366 }
2367
2368 /*
2369  * No permission check is needed.
2370  */
2371 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2372                      struct md_attr *ma, int mode)
2373 {
2374         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2375         struct mdd_device *mdd = mdo2mdd(obj);
2376         struct thandle    *handle = NULL;
2377         int rc;
2378         int is_orphan = 0, reset = 1;
2379
2380 #ifdef HAVE_QUOTA_SUPPORT
2381         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2382         struct mds_obd *mds = &obd->u.mds;
2383         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2384         int quota_opc = 0;
2385 #endif
2386         ENTRY;
2387
2388         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2389                 mdd_obj->mod_count--;
2390
2391                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2392                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2393                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2394                 RETURN(0);
2395         }
2396
2397         /* check without any lock */
2398         if (mdd_obj->mod_count == 1 &&
2399             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2400  again:
2401                 handle = mdd_trans_create(env, mdo2mdd(obj));
2402                 if (IS_ERR(handle))
2403                         RETURN(PTR_ERR(handle));
2404
2405                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2406                 if (rc)
2407                         GOTO(stop, rc);
2408
2409                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2410                 if (rc)
2411                         GOTO(stop, rc);
2412
2413                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2414                 if (rc)
2415                         GOTO(stop, rc);
2416         }
2417
2418         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2419         if (handle == NULL && mdd_obj->mod_count == 1 &&
2420             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2421                 mdd_write_unlock(env, mdd_obj);
2422                 goto again;
2423         }
2424
2425         /* release open count */
2426         mdd_obj->mod_count --;
2427
2428         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2429                 /* remove link to object from orphan index */
2430                 LASSERT(handle != NULL);
2431                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2432                 if (rc == 0) {
2433                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2434                                "list, OSS objects to be destroyed.\n",
2435                                PFID(mdd_object_fid(mdd_obj)));
2436                         is_orphan = 1;
2437                 } else {
2438                         CERROR("Object "DFID" can not be deleted from orphan "
2439                                 "list, maybe cause OST objects can not be "
2440                                 "destroyed (err: %d).\n",
2441                                 PFID(mdd_object_fid(mdd_obj)), rc);
2442                         /* If object was not deleted from orphan list, do not
2443                          * destroy OSS objects, which will be done when next
2444                          * recovery. */
2445                         GOTO(out, rc);
2446                 }
2447         }
2448
2449         rc = mdd_iattr_get(env, mdd_obj, ma);
2450         /* Object maybe not in orphan list originally, it is rare case for
2451          * mdd_finish_unlink() failure. */
2452         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2453 #ifdef HAVE_QUOTA_SUPPORT
2454                 if (mds->mds_quota) {
2455                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2456                         mdd_quota_wrapper(&ma->ma_attr, qids);
2457                 }
2458 #endif
2459                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2460                 if (ma->ma_valid & MA_FLAGS &&
2461                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2462                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2463                 } else {
2464                         if (handle == NULL) {
2465                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2466                                 if (IS_ERR(handle))
2467                                         GOTO(out, rc = PTR_ERR(handle));
2468
2469                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2470                                                              handle);
2471                                 if (rc)
2472                                         GOTO(out, rc);
2473
2474                                 rc = mdd_declare_changelog_store(env, mdd,
2475                                                                  NULL, handle);
2476                                 if (rc)
2477                                         GOTO(stop, rc);
2478
2479                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2480                                 if (rc)
2481                                         GOTO(out, rc);
2482                         }
2483
2484                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2485                         if (rc == 0)
2486                                 reset = 0;
2487                 }
2488
2489                 if (rc != 0)
2490                         CERROR("Error when prepare to delete Object "DFID" , "
2491                                "which will cause OST objects can not be "
2492                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2493         }
2494         EXIT;
2495
2496 out:
2497         if (reset)
2498                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2499
2500         mdd_write_unlock(env, mdd_obj);
2501
2502         if (rc == 0 &&
2503             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2504             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2505                 if (handle == NULL) {
2506                         handle = mdd_trans_create(env, mdo2mdd(obj));
2507                         if (IS_ERR(handle))
2508                                 GOTO(stop, rc = IS_ERR(handle));
2509
2510                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2511                                                          handle);
2512                         if (rc)
2513                                 GOTO(stop, rc);
2514
2515                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2516                         if (rc)
2517                                 GOTO(stop, rc);
2518                 }
2519
2520                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2521                                          mdd_obj, handle);
2522         }
2523
2524 stop:
2525         if (handle != NULL)
2526                 mdd_trans_stop(env, mdd, rc, handle);
2527 #ifdef HAVE_QUOTA_SUPPORT
2528         if (quota_opc)
2529                 /* Trigger dqrel on the owner of child. If failed,
2530                  * the next call for lquota_chkquota will process it */
2531                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2532                               quota_opc);
2533 #endif
2534         return rc;
2535 }
2536
2537 /*
2538  * Permission check is done when open,
2539  * no need check again.
2540  */
2541 static int mdd_readpage_sanity_check(const struct lu_env *env,
2542                                      struct mdd_object *obj)
2543 {
2544         struct dt_object *next = mdd_object_child(obj);
2545         int rc;
2546         ENTRY;
2547
2548         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2549                 rc = 0;
2550         else
2551                 rc = -ENOTDIR;
2552
2553         RETURN(rc);
2554 }
2555
2556 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2557                               struct lu_dirpage *dp, int nob,
2558                               const struct dt_it_ops *iops, struct dt_it *it,
2559                               __u32 attr)
2560 {
2561         void                   *area = dp;
2562         int                     result;
2563         __u64                   hash = 0;
2564         struct lu_dirent       *ent;
2565         struct lu_dirent       *last = NULL;
2566         int                     first = 1;
2567
2568         memset(area, 0, sizeof (*dp));
2569         area += sizeof (*dp);
2570         nob  -= sizeof (*dp);
2571
2572         ent  = area;
2573         do {
2574                 int    len;
2575                 int    recsize;
2576
2577                 len = iops->key_size(env, it);
2578
2579                 /* IAM iterator can return record with zero len. */
2580                 if (len == 0)
2581                         goto next;
2582
2583                 hash = iops->store(env, it);
2584                 if (unlikely(first)) {
2585                         first = 0;
2586                         dp->ldp_hash_start = cpu_to_le64(hash);
2587                 }
2588
2589                 /* calculate max space required for lu_dirent */
2590                 recsize = lu_dirent_calc_size(len, attr);
2591
2592                 if (nob >= recsize) {
2593                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2594                         if (result == -ESTALE)
2595                                 goto next;
2596                         if (result != 0)
2597                                 goto out;
2598
2599                         /* osd might not able to pack all attributes,
2600                          * so recheck rec length */
2601                         recsize = le16_to_cpu(ent->lde_reclen);
2602                 } else {
2603                         result = (last != NULL) ? 0 :-EINVAL;
2604                         goto out;
2605                 }
2606                 last = ent;
2607                 ent = (void *)ent + recsize;
2608                 nob -= recsize;
2609
2610 next:
2611                 result = iops->next(env, it);
2612                 if (result == -ESTALE)
2613                         goto next;
2614         } while (result == 0);
2615
2616 out:
2617         dp->ldp_hash_end = cpu_to_le64(hash);
2618         if (last != NULL) {
2619                 if (last->lde_hash == dp->ldp_hash_end)
2620                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2621                 last->lde_reclen = 0; /* end mark */
2622         }
2623         return result;
2624 }
2625
2626 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2627                           const struct lu_rdpg *rdpg)
2628 {
2629         struct dt_it      *it;
2630         struct dt_object  *next = mdd_object_child(obj);
2631         const struct dt_it_ops  *iops;
2632         struct page       *pg;
2633         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2634         int i;
2635         int nlupgs = 0;
2636         int rc;
2637         int nob;
2638
2639         LASSERT(rdpg->rp_pages != NULL);
2640         LASSERT(next->do_index_ops != NULL);
2641
2642         if (rdpg->rp_count <= 0)
2643                 return -EFAULT;
2644
2645         /*
2646          * iterate through directory and fill pages from @rdpg
2647          */
2648         iops = &next->do_index_ops->dio_it;
2649         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2650         if (IS_ERR(it))
2651                 return PTR_ERR(it);
2652
2653         rc = iops->load(env, it, rdpg->rp_hash);
2654
2655         if (rc == 0) {
2656                 /*
2657                  * Iterator didn't find record with exactly the key requested.
2658                  *
2659                  * It is currently either
2660                  *
2661                  *     - positioned above record with key less than
2662                  *     requested---skip it.
2663                  *
2664                  *     - or not positioned at all (is in IAM_IT_SKEWED
2665                  *     state)---position it on the next item.
2666                  */
2667                 rc = iops->next(env, it);
2668         } else if (rc > 0)
2669                 rc = 0;
2670
2671         /*
2672          * At this point and across for-loop:
2673          *
2674          *  rc == 0 -> ok, proceed.
2675          *  rc >  0 -> end of directory.
2676          *  rc <  0 -> error.
2677          */
2678         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2679              i++, nob -= CFS_PAGE_SIZE) {
2680                 struct lu_dirpage *dp;
2681
2682                 LASSERT(i < rdpg->rp_npages);
2683                 pg = rdpg->rp_pages[i];
2684                 dp = cfs_kmap(pg);
2685 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2686 repeat:
2687 #endif
2688                 rc = mdd_dir_page_build(env, mdd, dp,
2689                                         min_t(int, nob, LU_PAGE_SIZE),
2690                                         iops, it, rdpg->rp_attrs);
2691                 if (rc > 0) {
2692                         /*
2693                          * end of directory.
2694                          */
2695                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2696                         nlupgs++;
2697                 } else if (rc < 0) {
2698                         CWARN("build page failed: %d!\n", rc);
2699                 } else {
2700                         nlupgs++;
2701 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2702                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2703                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2704                                 goto repeat;
2705 #endif
2706                 }
2707                 cfs_kunmap(pg);
2708         }
2709         if (rc >= 0) {
2710                 struct lu_dirpage *dp;
2711
2712                 dp = cfs_kmap(rdpg->rp_pages[0]);
2713                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2714                 if (nlupgs == 0) {
2715                         /*
2716                          * No pages were processed, mark this for first page
2717                          * and send back.
2718                          */
2719                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2720                         nlupgs = 1;
2721                 }
2722                 cfs_kunmap(rdpg->rp_pages[0]);
2723
2724                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2725         }
2726         iops->put(env, it);
2727         iops->fini(env, it);
2728
2729         return rc;
2730 }
2731
2732 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2733                  const struct lu_rdpg *rdpg)
2734 {
2735         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2736         int rc;
2737         ENTRY;
2738
2739         if (mdd_object_exists(mdd_obj) == 0) {
2740                 CERROR("%s: object "DFID" not found: rc = -2\n",
2741                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2742                 return -ENOENT;
2743         }
2744
2745         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2746         rc = mdd_readpage_sanity_check(env, mdd_obj);
2747         if (rc)
2748                 GOTO(out_unlock, rc);
2749
2750         if (mdd_is_dead_obj(mdd_obj)) {
2751                 struct page *pg;
2752                 struct lu_dirpage *dp;
2753
2754                 /*
2755                  * According to POSIX, please do not return any entry to client:
2756                  * even dot and dotdot should not be returned.
2757                  */
2758                 CWARN("readdir from dead object: "DFID"\n",
2759                         PFID(mdd_object_fid(mdd_obj)));
2760
2761                 if (rdpg->rp_count <= 0)
2762                         GOTO(out_unlock, rc = -EFAULT);
2763                 LASSERT(rdpg->rp_pages != NULL);
2764
2765                 pg = rdpg->rp_pages[0];
2766                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2767                 memset(dp, 0 , sizeof(struct lu_dirpage));
2768                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2769                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2770                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2771                 cfs_kunmap(pg);
2772                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2773         }
2774
2775         rc = __mdd_readpage(env, mdd_obj, rdpg);
2776
2777         EXIT;
2778 out_unlock:
2779         mdd_read_unlock(env, mdd_obj);
2780         return rc;
2781 }
2782
2783 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2784 {
2785         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2786         struct dt_object *next;
2787
2788         if (mdd_object_exists(mdd_obj) == 0) {
2789                 CERROR("%s: object "DFID" not found: rc = -2\n",
2790                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2791                 return -ENOENT;
2792         }
2793         next = mdd_object_child(mdd_obj);
2794         return next->do_ops->do_object_sync(env, next);
2795 }
2796
2797 const struct md_object_operations mdd_obj_ops = {
2798         .moo_permission    = mdd_permission,
2799         .moo_attr_get      = mdd_attr_get,
2800         .moo_attr_set      = mdd_attr_set,
2801         .moo_xattr_get     = mdd_xattr_get,
2802         .moo_xattr_set     = mdd_xattr_set,
2803         .moo_xattr_list    = mdd_xattr_list,
2804         .moo_xattr_del     = mdd_xattr_del,
2805         .moo_object_create = mdd_object_create,
2806         .moo_ref_add       = mdd_ref_add,
2807         .moo_ref_del       = mdd_ref_del,
2808         .moo_open          = mdd_open,
2809         .moo_close         = mdd_close,
2810         .moo_readpage      = mdd_readpage,
2811         .moo_readlink      = mdd_readlink,
2812         .moo_changelog     = mdd_changelog,
2813         .moo_capa_get      = mdd_capa_get,
2814         .moo_object_sync   = mdd_object_sync,
2815         .moo_path          = mdd_path,
2816         .moo_file_lock     = mdd_file_lock,
2817         .moo_file_unlock   = mdd_file_unlock,
2818 };