Whamcloud - gitweb
LU-8837 lustre: remove target declarations from obd.h
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdd/mdd_object.c
32  *
33  * Lustre Metadata Server (mdd) routines
34  *
35  * Author: Wang Di <wangdi@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lprocfs_status.h>
45 /* fid_be_cpu(), fid_cpu_to_be(). */
46 #include <lustre_fid.h>
47 #include <lustre_idmap.h>
48 #include <uapi/linux/lustre/lustre_param.h>
49 #include <lustre_mds.h>
50
51 #include "mdd_internal.h"
52
53 static const struct lu_object_operations mdd_lu_obj_ops;
54
55 struct mdd_object_user {
56         struct list_head        mou_list;       /**< linked off mod_users */
57         u64                     mou_open_flags; /**< open mode by client */
58         __u64                   mou_uidgid;     /**< uid_gid on client */
59         int                     mou_opencount;  /**< # opened */
60         ktime_t                 mou_deniednext; /**< time of next access denied
61                                                  * notfication
62                                                  */
63 };
64
65 static int mdd_xattr_get(const struct lu_env *env,
66                          struct md_object *obj, struct lu_buf *buf,
67                          const char *name);
68
69 static int mdd_changelog_data_store_by_fid(const struct lu_env *env,
70                                            struct mdd_device *mdd,
71                                            enum changelog_rec_type type,
72                                            enum changelog_rec_flags clf_flags,
73                                            const struct lu_fid *fid,
74                                            const struct lu_fid *pfid,
75                                            const char *xattr_name,
76                                            struct thandle *handle);
77
78 static inline bool has_prefix(const char *str, const char *prefix);
79
80
81 static u32 flags_helper(u64 open_flags)
82 {
83         u32 open_mode = 0;
84
85         if (open_flags & MDS_FMODE_EXEC) {
86                 open_mode = MDS_FMODE_EXEC;
87         } else {
88                 if (open_flags & MDS_FMODE_READ)
89                         open_mode = MDS_FMODE_READ;
90                 if (open_flags &
91                     (MDS_FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
92                         open_mode |= MDS_FMODE_WRITE;
93         }
94
95         return open_mode;
96 }
97
98 /** Allocate/init a user and its sub-structures.
99  *
100  * \param flags [IN]
101  * \param uid [IN]
102  * \param gid [IN]
103  * \retval mou [OUT] success valid structure
104  * \retval mou [OUT]
105  */
106 static struct mdd_object_user *mdd_obj_user_alloc(u64 open_flags,
107                                                   uid_t uid, gid_t gid)
108 {
109         struct mdd_object_user *mou;
110
111         ENTRY;
112
113         OBD_SLAB_ALLOC_PTR(mou, mdd_object_kmem);
114         if (mou == NULL)
115                 RETURN(ERR_PTR(-ENOMEM));
116
117         mou->mou_open_flags = open_flags;
118         mou->mou_uidgid = ((__u64)uid << 32) | gid;
119         mou->mou_opencount = 0;
120         mou->mou_deniednext = ktime_set(0, 0);
121
122         RETURN(mou);
123 }
124
125 /**
126  * Free a user and its sub-structures.
127  *
128  * \param mou [IN]  user to be freed.
129  */
130 static void mdd_obj_user_free(struct mdd_object_user *mou)
131 {
132         OBD_SLAB_FREE_PTR(mou, mdd_object_kmem);
133 }
134
135 /**
136  * Find if UID/GID already has this file open
137  *
138  * Caller should have write-locked \param mdd_obj.
139  * \param mdd_obj [IN] mdd_obj
140  * \param uid [IN] client uid
141  * \param gid [IN] client gid
142  * \retval user pointer or NULL if not found
143  */
144 static
145 struct mdd_object_user *mdd_obj_user_find(struct mdd_object *mdd_obj,
146                                           uid_t uid, gid_t gid,
147                                           u64 open_flags)
148 {
149         struct mdd_object_user *mou;
150         __u64 uidgid;
151
152         ENTRY;
153
154         uidgid = ((__u64)uid << 32) | gid;
155         list_for_each_entry(mou, &mdd_obj->mod_users, mou_list) {
156                 if (mou->mou_uidgid == uidgid &&
157                     flags_helper(mou->mou_open_flags) ==
158                     flags_helper(open_flags))
159                         RETURN(mou);
160         }
161         RETURN(NULL);
162 }
163
164 /**
165  * Add a user to the list of openers for this file
166  *
167  * Caller should have write-locked \param mdd_obj.
168  * \param mdd_obj [IN] mdd_obj
169  * \param mou [IN] user
170  * \retval 0 success
171  * \retval -ve failure
172  */
173 static int mdd_obj_user_add(struct mdd_object *mdd_obj,
174                             struct mdd_object_user *mou,
175                             bool denied)
176 {
177         struct mdd_device *mdd = mdd_obj2mdd_dev(mdd_obj);
178         struct mdd_object_user *tmp;
179         __u32 uid = mou->mou_uidgid >> 32;
180         __u32 gid = mou->mou_uidgid & ((1UL << 32) - 1);
181
182         ENTRY;
183         tmp = mdd_obj_user_find(mdd_obj, uid, gid, mou->mou_open_flags);
184         if (tmp != NULL)
185                 RETURN(-EEXIST);
186
187         list_add_tail(&mou->mou_list, &mdd_obj->mod_users);
188
189         if (denied)
190                 /* next 'access denied' notification cannot happen before
191                  * mou_deniednext
192                  */
193                 mou->mou_deniednext =
194                         ktime_add(ktime_get(),
195                                   ktime_set(mdd->mdd_cl.mc_deniednext, 0));
196         else
197                 mou->mou_opencount++;
198
199         RETURN(0);
200 }
201 /**
202  * Remove UID from the list
203  *
204  * Caller should have write-locked \param mdd_obj.
205  * \param mdd_obj [IN] mdd_obj
206  * \param uid [IN] user
207  * \retval -ve failure
208  */
209 static int mdd_obj_user_remove(struct mdd_object *mdd_obj,
210                                struct mdd_object_user *mou)
211 {
212         ENTRY;
213
214         if (mou == NULL)
215                 RETURN(-ENOENT);
216
217         list_del_init(&mou->mou_list);
218
219         mdd_obj_user_free(mou);
220
221         RETURN(0);
222 }
223 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
224                struct lu_attr *la)
225 {
226         int rc;
227
228         if (mdd_object_exists(obj) == 0)
229                 return -ENOENT;
230
231         rc = mdo_attr_get(env, obj, la);
232         if (unlikely(rc != 0)) {
233                 if (rc == -ENOENT)
234                         obj->mod_flags |= DEAD_OBJ;
235                 return rc;
236         }
237
238         if (la->la_valid & LA_FLAGS && la->la_flags & LUSTRE_ORPHAN_FL)
239                 obj->mod_flags |= ORPHAN_OBJ | DEAD_OBJ;
240
241         return 0;
242 }
243
244 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
245 {
246         return lu_env_info(env, &mdd_thread_key);
247 }
248
249 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
250 {
251         struct lu_buf *buf;
252
253         buf = &mdd_env_info(env)->mdi_buf[0];
254         buf->lb_buf = area;
255         buf->lb_len = len;
256         return buf;
257 }
258
259 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
260                                        const void *area, ssize_t len)
261 {
262         struct lu_buf *buf;
263
264         buf = &mdd_env_info(env)->mdi_buf[0];
265         buf->lb_buf = (void *)area;
266         buf->lb_len = len;
267         return buf;
268 }
269
270 struct lu_object *mdd_object_alloc(const struct lu_env *env,
271                                    const struct lu_object_header *hdr,
272                                    struct lu_device *d)
273 {
274         struct mdd_device *mdd = lu2mdd_dev(d);
275         struct mdd_object *mdd_obj;
276         struct lu_object *o;
277
278         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, GFP_NOFS);
279         if (!mdd_obj)
280                 return NULL;
281
282         o = mdd2lu_obj(mdd_obj);
283         lu_object_init(o, NULL, d);
284         mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
285         mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
286         mdd_obj->mod_count = 0;
287         obd_obt_init(mdd2obd_dev(mdd));
288         o->lo_ops = &mdd_lu_obj_ops;
289         INIT_LIST_HEAD(&mdd_obj->mod_users);
290
291         return o;
292 }
293
294 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
295                            const struct lu_object_conf *unused)
296 {
297         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
298         struct mdd_object *mdd_obj = lu2mdd_obj(o);
299         struct lu_object  *below;
300         struct lu_device  *under;
301         ENTRY;
302
303         mdd_obj->mod_cltime = ktime_set(0, 0);
304         under = &d->mdd_child->dd_lu_dev;
305         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
306         if (IS_ERR(below))
307                 RETURN(PTR_ERR(below));
308
309         lu_object_add(o, below);
310
311         RETURN(0);
312 }
313
314 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
315 {
316         int rc = 0;
317
318         if (lu_object_exists(o)) {
319                 struct mdd_object *mdd_obj = lu2mdd_obj(o);
320                 struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
321
322                 rc = mdd_la_get(env, mdd_obj, attr);
323         }
324
325         return rc;
326 }
327
328 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
329 {
330         struct mdd_object *mdd = lu2mdd_obj(o);
331         struct mdd_object_user *mou, *tmp2;
332
333         /* free user list */
334         list_for_each_entry_safe(mou, tmp2, &mdd->mod_users, mou_list) {
335                 list_del(&mou->mou_list);
336                 mdd_obj_user_free(mou);
337         }
338
339         lu_object_fini(o);
340         /* mdd doesn't contain an lu_object_header, so don't need call_rcu */
341         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
342 }
343
344 static int mdd_object_print(const struct lu_env *env, void *cookie,
345                             lu_printer_t p, const struct lu_object *o)
346 {
347         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
348
349         return (*p)(env, cookie,
350                     LUSTRE_MDD_NAME"-object@%p(open_count=%d, valid=%x, cltime=%lldns, flags=%lx)",
351                     mdd, mdd->mod_count, mdd->mod_valid,
352                     ktime_to_ns(mdd->mod_cltime), mdd->mod_flags);
353 }
354
355 static const struct lu_object_operations mdd_lu_obj_ops = {
356         .loo_object_init    = mdd_object_init,
357         .loo_object_start   = mdd_object_start,
358         .loo_object_free    = mdd_object_free,
359         .loo_object_print   = mdd_object_print,
360 };
361
362 struct mdd_object *mdd_object_find(const struct lu_env *env,
363                                    struct mdd_device *d,
364                                    const struct lu_fid *f)
365 {
366         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
367 }
368
369 /*
370  * No permission check is needed.
371  */
372 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
373                  struct md_attr *ma)
374 {
375         struct mdd_object *mdd_obj = md2mdd_obj(obj);
376         int               rc;
377
378         ENTRY;
379
380         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
381         if ((ma->ma_need & MA_INODE) != 0 && mdd_is_dead_obj(mdd_obj))
382                 ma->ma_attr.la_nlink = 0;
383
384         RETURN(rc);
385 }
386
387 /*
388  * No permission check is needed.
389  */
390 static int mdd_xattr_get(const struct lu_env *env,
391                          struct md_object *obj, struct lu_buf *buf,
392                          const char *name)
393 {
394         struct mdd_object *mdd_obj = md2mdd_obj(obj);
395         struct mdd_device *mdd;
396         int rc;
397
398         ENTRY;
399
400         if (mdd_object_exists(mdd_obj) == 0) {
401                 CERROR("%s: object "DFID" not found: rc = -2\n",
402                        mdd_obj_dev_name(mdd_obj),
403                        PFID(mdd_object_fid(mdd_obj)));
404                 return -ENOENT;
405         }
406
407         /* If the object has been destroyed, then do not get LMVEA, because
408          * it needs to load stripes from the iteration of the master object,
409          * and it will cause problem if master object has been destroyed, see
410          * LU-6427 */
411         if (unlikely((mdd_obj->mod_flags & DEAD_OBJ) &&
412                      !(mdd_obj->mod_flags & ORPHAN_OBJ) &&
413                       strcmp(name, XATTR_NAME_LMV) == 0))
414                 RETURN(-ENOENT);
415
416         /* If the object has been delete from the namespace, then
417          * get linkEA should return -ENOENT as well */
418         if (unlikely((mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ)) &&
419                       strcmp(name, XATTR_NAME_LINK) == 0))
420                 RETURN(-ENOENT);
421
422         mdd_read_lock(env, mdd_obj, DT_TGT_CHILD);
423         rc = mdo_xattr_get(env, mdd_obj, buf, name);
424         mdd_read_unlock(env, mdd_obj);
425
426         mdd = mdo2mdd(obj);
427
428         /* record only getting user xattrs and acls */
429         if (rc >= 0 && buf->lb_buf &&
430             mdd_changelog_enabled(env, mdd, CL_GETXATTR) &&
431             (has_prefix(name, XATTR_USER_PREFIX) ||
432              has_prefix(name, XATTR_NAME_POSIX_ACL_ACCESS) ||
433              has_prefix(name, XATTR_NAME_POSIX_ACL_DEFAULT))) {
434                 struct thandle *handle;
435                 int rc2;
436
437                 LASSERT(mdd_object_fid(mdd_obj) != NULL);
438
439                 handle = mdd_trans_create(env, mdd);
440                 if (IS_ERR(handle))
441                         RETURN(PTR_ERR(handle));
442
443                 rc2 = mdd_declare_changelog_store(env, mdd, CL_GETXATTR, NULL,
444                                                   NULL, handle);
445                 if (rc2)
446                         GOTO(stop, rc2);
447
448                 rc2 = mdd_trans_start(env, mdd, handle);
449                 if (rc2)
450                         GOTO(stop, rc2);
451
452                 rc2 = mdd_changelog_data_store_by_fid(env, mdd, CL_GETXATTR, 0,
453                                                       mdd_object_fid(mdd_obj),
454                                                       NULL, name, handle);
455
456 stop:
457                 rc2 = mdd_trans_stop(env, mdd, rc2, handle);
458                 if (rc2)
459                         rc = rc2;
460         }
461
462         RETURN(rc);
463 }
464
465 /*
466  * Permission check is done when open,
467  * no need check again.
468  */
469 int mdd_readlink(const struct lu_env *env, struct md_object *obj,
470                  struct lu_buf *buf)
471 {
472         struct mdd_object *mdd_obj = md2mdd_obj(obj);
473         struct dt_object  *next;
474         loff_t             pos = 0;
475         int                rc;
476         ENTRY;
477
478         if (mdd_object_exists(mdd_obj) == 0) {
479                 CERROR("%s: object "DFID" not found: rc = -2\n",
480                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
481                 return -ENOENT;
482         }
483
484         next = mdd_object_child(mdd_obj);
485         LASSERT(next != NULL);
486         LASSERT(next->do_body_ops != NULL);
487         LASSERT(next->do_body_ops->dbo_read != NULL);
488         mdd_read_lock(env, mdd_obj, DT_TGT_CHILD);
489         rc = dt_read(env, next, buf, &pos);
490         mdd_read_unlock(env, mdd_obj);
491         RETURN(rc);
492 }
493
494 /*
495  * No permission check is needed.
496  */
497 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
498                           struct lu_buf *buf)
499 {
500         struct mdd_object *mdd_obj = md2mdd_obj(obj);
501         int rc;
502
503         ENTRY;
504
505         mdd_read_lock(env, mdd_obj, DT_TGT_CHILD);
506         rc = mdo_xattr_list(env, mdd_obj, buf);
507         mdd_read_unlock(env, mdd_obj);
508
509         /* If the buffer is NULL then we are only here to get the
510          * length of the xattr name list. */
511         if (rc < 0 || buf->lb_buf == NULL)
512                 RETURN(rc);
513
514         /*
515          * Filter out XATTR_NAME_LINK if this is an orphan object.  See
516          * mdd_xattr_get().
517          */
518         if (unlikely(mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ))) {
519                 char   *end = (char *)buf->lb_buf + rc;
520                 char   *p = buf->lb_buf;
521
522                 while (p < end) {
523                         char   *next = p + strlen(p) + 1;
524
525                         if (strcmp(p, XATTR_NAME_LINK) == 0) {
526                                 if (end - next > 0)
527                                         memmove(p, next, end - next);
528                                 rc -= next - p;
529                                 CDEBUG(D_INFO, "Filtered out "XATTR_NAME_LINK
530                                        " of orphan "DFID"\n",
531                                        PFID(mdd_object_fid(mdd_obj)));
532                                 break;
533                         }
534
535                         p = next;
536                 }
537         }
538
539         RETURN(rc);
540 }
541
542 int mdd_invalidate(const struct lu_env *env, struct md_object *obj)
543 {
544         return mdo_invalidate(env, md2mdd_obj(obj));
545 }
546
547 int mdd_declare_create_object_internal(const struct lu_env *env,
548                                        struct mdd_object *p,
549                                        struct mdd_object *c,
550                                        struct lu_attr *attr,
551                                        struct thandle *handle,
552                                        const struct md_op_spec *spec,
553                                        struct dt_allocation_hint *hint)
554 {
555         struct dt_object_format *dof = &mdd_env_info(env)->mdi_dof;
556         const struct dt_index_features *feat = spec->sp_feat;
557         int rc;
558         ENTRY;
559
560         if (feat != &dt_directory_features && feat != NULL) {
561                 dof->dof_type = DFT_INDEX;
562                 dof->u.dof_idx.di_feat = feat;
563         } else {
564                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
565                 if (dof->dof_type == DFT_REGULAR) {
566                         dof->u.dof_reg.striped =
567                                 md_should_create(spec->sp_cr_flags);
568                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
569                                 dof->u.dof_reg.striped = 0;
570                         /* is this replay? */
571                         if (spec->no_create)
572                                 dof->u.dof_reg.striped = 0;
573                 }
574         }
575
576         rc = mdo_declare_create_object(env, c, attr, hint, dof, handle);
577
578         RETURN(rc);
579 }
580
581 int mdd_create_object_internal(const struct lu_env *env, struct mdd_object *p,
582                                struct mdd_object *c, struct lu_attr *attr,
583                                struct thandle *handle,
584                                const struct md_op_spec *spec,
585                                struct dt_allocation_hint *hint)
586 {
587         struct dt_object_format *dof = &mdd_env_info(env)->mdi_dof;
588         int rc;
589         ENTRY;
590
591         LASSERT(!mdd_object_exists(c));
592
593         rc = mdo_create_object(env, c, attr, hint, dof, handle);
594
595         RETURN(rc);
596 }
597
598 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
599                           const struct lu_attr *attr, struct thandle *handle,
600                           int needacl)
601 {
602         int rc;
603         ENTRY;
604
605         rc = mdo_attr_set(env, obj, attr, handle);
606 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
607         if (!rc && (attr->la_valid & LA_MODE) && needacl)
608                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
609 #endif
610         RETURN(rc);
611 }
612
613 int mdd_update_time(const struct lu_env *env, struct mdd_object *obj,
614                     const struct lu_attr *oattr, struct lu_attr *attr,
615                     struct thandle *handle)
616 {
617         int rc = 0;
618         ENTRY;
619
620         LASSERT(attr->la_valid & LA_CTIME);
621         LASSERT(oattr != NULL);
622
623         /* Make sure the ctime is increased only, however, it's not strictly
624          * reliable at here because there is not guarantee to hold lock on
625          * object, so we just bypass some unnecessary cmtime setting first
626          * and OSD has to check it again. */
627         if (attr->la_ctime < oattr->la_ctime)
628                 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
629         else if (attr->la_valid == LA_CTIME &&
630                  attr->la_ctime == oattr->la_ctime)
631                 attr->la_valid &= ~LA_CTIME;
632
633         if (attr->la_valid != 0)
634                 rc = mdd_attr_set_internal(env, obj, attr, handle, 0);
635         RETURN(rc);
636 }
637
638
639 static bool is_project_state_change(const struct lu_attr *oattr,
640                                     struct lu_attr *la)
641 {
642         if (la->la_valid & LA_PROJID &&
643             oattr->la_projid != la->la_projid)
644                 return true;
645
646         if ((la->la_valid & LA_FLAGS) &&
647             (la->la_flags & LUSTRE_PROJINHERIT_FL) !=
648             (oattr->la_flags & LUSTRE_PROJINHERIT_FL))
649                 return true;
650
651         return false;
652 }
653
654 /*
655  * This gives the same functionality as the code between
656  * sys_chmod and inode_setattr
657  * chown_common and inode_setattr
658  * utimes and inode_setattr
659  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
660  */
661 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
662                         const struct lu_attr *oattr, struct lu_attr *la,
663                         const struct md_attr *ma)
664 {
665         struct lu_ucred  *uc;
666         int               rc = 0;
667         const unsigned long flags = ma->ma_attr_flags;
668
669         ENTRY;
670
671         if (!la->la_valid)
672                 RETURN(0);
673
674         /* Do not permit change file type */
675         if (la->la_valid & LA_TYPE)
676                 RETURN(-EPERM);
677
678         /* They should not be processed by setattr */
679         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
680                 RETURN(-EPERM);
681
682         LASSERT(oattr != NULL);
683
684         uc = lu_ucred_check(env);
685         if (uc == NULL)
686                 RETURN(0);
687
688         if (is_project_state_change(oattr, la)) {
689                 if (!cap_raised(uc->uc_cap, CAP_SYS_RESOURCE) &&
690                     !lustre_in_group_p(uc, ma->ma_enable_chprojid_gid) &&
691                     !(ma->ma_enable_chprojid_gid == -1 &&
692                       mdd_permission_internal(env, obj, oattr, MAY_WRITE)))
693                         RETURN(-EPERM);
694         }
695
696         if (la->la_valid == LA_CTIME) {
697                 if (!(flags & MDS_PERM_BYPASS))
698                         /* This is only for set ctime when rename's source is
699                          * on remote MDS. */
700                         rc = mdd_may_delete(env, NULL, NULL, obj, oattr, NULL,
701                                             1, 0);
702                 if (rc == 0 && la->la_ctime <= oattr->la_ctime)
703                         la->la_valid &= ~LA_CTIME;
704                 RETURN(rc);
705         }
706
707         if (flags & MDS_CLOSE_UPDATE_TIMES &&
708             la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
709                 /* This is an atime/mtime/ctime attribute update for
710                  * close RPCs.
711                  */
712                 if (la->la_valid & LA_ATIME &&
713                     la->la_atime <= (oattr->la_atime +
714                                 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
715                         la->la_valid &= ~LA_ATIME;
716                 if (la->la_valid & LA_CTIME && la->la_ctime <= oattr->la_ctime)
717                         la->la_valid &= ~LA_CTIME;
718                 if (la->la_valid & LA_MTIME && la->la_mtime <= oattr->la_mtime)
719                         la->la_valid &= ~LA_MTIME;
720                 RETURN(0);
721         }
722
723         /* Check if flags change. */
724         if (la->la_valid & LA_FLAGS) {
725                 unsigned int oldflags = oattr->la_flags &
726                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
727                 unsigned int newflags = la->la_flags &
728                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
729
730                 if ((uc->uc_fsuid != oattr->la_uid) &&
731                     !cap_raised(uc->uc_cap, CAP_FOWNER))
732                         RETURN(-EPERM);
733
734                 /* The IMMUTABLE and APPEND_ONLY flags can
735                  * only be changed by the relevant capability. */
736                 if ((oldflags ^ newflags) &&
737                     !cap_raised(uc->uc_cap, CAP_LINUX_IMMUTABLE))
738                         RETURN(-EPERM);
739
740                 if (!S_ISDIR(oattr->la_mode)) {
741                         la->la_flags &= ~(LUSTRE_DIRSYNC_FL | LUSTRE_TOPDIR_FL);
742                 } else if (la->la_flags & LUSTRE_ENCRYPT_FL) {
743                         /* when trying to add encryption flag on dir,
744                          * make sure it is empty
745                          */
746                         rc = mdd_dir_is_empty(env, obj);
747                         if (rc)
748                                 RETURN(rc);
749                         rc = 0;
750                 }
751         }
752
753         if (oattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL) &&
754             (la->la_valid & ~LA_FLAGS) &&
755             !(flags & MDS_PERM_BYPASS))
756                 RETURN(-EPERM);
757
758         /* Check for setting the obj time. */
759         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
760             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
761                 if ((uc->uc_fsuid != oattr->la_uid) &&
762                     !cap_raised(uc->uc_cap, CAP_FOWNER)) {
763                         rc = mdd_permission_internal(env, obj, oattr,
764                                                      MAY_WRITE);
765                         if (rc)
766                                 RETURN(rc);
767                 }
768         }
769
770         if (la->la_valid & LA_KILL_SUID) {
771                 la->la_valid &= ~LA_KILL_SUID;
772                 if ((oattr->la_mode & S_ISUID) &&
773                     !(la->la_valid & LA_MODE)) {
774                         la->la_mode = oattr->la_mode;
775                         la->la_valid |= LA_MODE;
776                 }
777                 la->la_mode &= ~S_ISUID;
778         }
779
780         if (la->la_valid & LA_KILL_SGID) {
781                 la->la_valid &= ~LA_KILL_SGID;
782                 if (((oattr->la_mode & (S_ISGID | S_IXGRP)) ==
783                         (S_ISGID | S_IXGRP)) &&
784                     !(la->la_valid & LA_MODE)) {
785                         la->la_mode = oattr->la_mode;
786                         la->la_valid |= LA_MODE;
787                 }
788                 la->la_mode &= ~S_ISGID;
789         }
790
791         /* Make sure a caller can chmod. */
792         if (la->la_valid & LA_MODE) {
793                 if (!(flags & MDS_PERM_BYPASS) &&
794                     (uc->uc_fsuid != oattr->la_uid) &&
795                     !cap_raised(uc->uc_cap, CAP_FOWNER))
796                         RETURN(-EPERM);
797
798                 if (la->la_mode == (umode_t) -1)
799                         la->la_mode = oattr->la_mode;
800                 else
801                         la->la_mode = (la->la_mode & S_IALLUGO) |
802                                         (oattr->la_mode & ~S_IALLUGO);
803
804                 /* Also check the setgid bit! */
805                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
806                                        la->la_gid : oattr->la_gid) &&
807                     !cap_raised(uc->uc_cap, CAP_FSETID))
808                         la->la_mode &= ~S_ISGID;
809         } else {
810                la->la_mode = oattr->la_mode;
811         }
812
813         /* Make sure a caller can chown. */
814         if (la->la_valid & LA_UID) {
815                 if (la->la_uid == (uid_t) -1)
816                         la->la_uid = oattr->la_uid;
817                 if (((uc->uc_fsuid != oattr->la_uid) ||
818                      (la->la_uid != oattr->la_uid)) &&
819                     !cap_raised(uc->uc_cap, CAP_CHOWN))
820                         RETURN(-EPERM);
821
822                 /* If the user or group of a non-directory has been
823                  * changed by a non-root user, remove the setuid bit.
824                  * 19981026 David C Niemi <niemi@tux.org>
825                  *
826                  * Changed this to apply to all users, including root,
827                  * to avoid some races. This is the behavior we had in
828                  * 2.0. The check for non-root was definitely wrong
829                  * for 2.2 anyway, as it should have been using
830                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
831                 if (((oattr->la_mode & S_ISUID) == S_ISUID) &&
832                 !S_ISDIR(oattr->la_mode)) {
833                         la->la_mode &= ~S_ISUID;
834                         la->la_valid |= LA_MODE;
835                 }
836         }
837
838         /* Make sure caller can chgrp. */
839         if (la->la_valid & LA_GID) {
840                 if (la->la_gid == (gid_t) -1)
841                         la->la_gid = oattr->la_gid;
842                 if (((uc->uc_fsuid != oattr->la_uid) ||
843                      ((la->la_gid != oattr->la_gid) &&
844                       !lustre_in_group_p(uc, la->la_gid))) &&
845                     !cap_raised(uc->uc_cap, CAP_CHOWN))
846                         RETURN(-EPERM);
847
848                 /* Likewise, if the user or group of a non-directory
849                  * has been changed by a non-root user, remove the
850                  * setgid bit UNLESS there is no group execute bit
851                  * (this would be a file marked for mandatory
852                  * locking).  19981026 David C Niemi <niemi@tux.org>
853                  *
854                  * Removed the fsuid check (see the comment above) --
855                  * 19990830 SD. */
856                 if (((oattr->la_mode & (S_ISGID | S_IXGRP)) ==
857                     (S_ISGID | S_IXGRP)) && !S_ISDIR(oattr->la_mode)) {
858                         la->la_mode &= ~S_ISGID;
859                         la->la_valid |= LA_MODE;
860                 }
861         }
862
863         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
864                 if (!((flags & MDS_OWNEROVERRIDE) &&
865                       (uc->uc_fsuid == oattr->la_uid)) &&
866                     !(flags & MDS_PERM_BYPASS)) {
867                         rc = mdd_permission_internal(env, obj, oattr,
868                                                      MAY_WRITE);
869                         if (rc != 0)
870                                 RETURN(rc);
871                 }
872         }
873
874         if (la->la_valid & LA_CTIME) {
875                 /**
876                  * The pure setattr, it has the priority over what is
877                  * already set, do not drop it if ctime is equal.
878                  */
879                 if (la->la_ctime < oattr->la_ctime)
880                         la->la_valid &= ~(LA_ATIME | LA_MTIME | LA_CTIME);
881         }
882
883         RETURN(0);
884 }
885
886 static int mdd_changelog_data_store_by_fid(const struct lu_env *env,
887                                            struct mdd_device *mdd,
888                                            enum changelog_rec_type type,
889                                            enum changelog_rec_flags clf_flags,
890                                            const struct lu_fid *fid,
891                                            const struct lu_fid *pfid,
892                                            const char *xattr_name,
893                                            struct thandle *handle)
894 {
895         const struct lu_ucred *uc = lu_ucred(env);
896         enum changelog_rec_extra_flags xflags = CLFE_INVALID;
897         struct llog_changelog_rec *rec;
898         struct lu_buf *buf;
899         int reclen;
900         int rc;
901
902         clf_flags = (clf_flags & CLF_FLAGMASK) | CLF_VERSION | CLF_EXTRA_FLAGS;
903
904         if (uc) {
905                 if (uc->uc_jobid[0] != '\0')
906                         clf_flags |= CLF_JOBID;
907                 xflags |= CLFE_UIDGID;
908                 xflags |= CLFE_NID;
909         }
910         if (type == CL_OPEN || type == CL_DN_OPEN)
911                 xflags |= CLFE_OPEN;
912         if (type == CL_SETXATTR || type == CL_GETXATTR)
913                 xflags |= CLFE_XATTR;
914
915         reclen = llog_data_len(LLOG_CHANGELOG_HDR_SZ +
916                                changelog_rec_offset(clf_flags & CLF_SUPPORTED,
917                                                     xflags & CLFE_SUPPORTED));
918         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_chlg_buf, reclen);
919         if (buf->lb_buf == NULL)
920                 RETURN(-ENOMEM);
921         rec = buf->lb_buf;
922
923         rec->cr_hdr.lrh_len = reclen;
924         rec->cr.cr_flags = clf_flags;
925         rec->cr.cr_type = (__u32)type;
926         rec->cr.cr_tfid = *fid;
927         if (pfid)
928                 rec->cr.cr_pfid = *pfid;
929         rec->cr.cr_namelen = 0;
930
931         if (clf_flags & CLF_JOBID)
932                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
933
934         if (clf_flags & CLF_EXTRA_FLAGS) {
935                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
936                 if (xflags & CLFE_UIDGID)
937                         mdd_changelog_rec_extra_uidgid(&rec->cr,
938                                                        uc->uc_uid, uc->uc_gid);
939                 if (xflags & CLFE_NID)
940                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
941                 if (xflags & CLFE_OPEN)
942                         mdd_changelog_rec_extra_omode(&rec->cr, clf_flags);
943                 if (xflags & CLFE_XATTR) {
944                         if (xattr_name == NULL)
945                                 RETURN(-EINVAL);
946                         mdd_changelog_rec_extra_xattr(&rec->cr, xattr_name);
947                 }
948         }
949
950         rc = mdd_changelog_store(env, mdd, rec, handle);
951         RETURN(rc);
952 }
953
954
955 /** Store a data change changelog record
956  * If this fails, we must fail the whole transaction; we don't
957  * want the change to commit without the log entry.
958  * \param mdd_obj - mdd_object of change
959  * \param handle - transaction handle
960  * \param pfid - parent FID for CL_MTIME changelogs
961  */
962 int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
963                              enum changelog_rec_type type,
964                              enum changelog_rec_flags clf_flags,
965                              struct mdd_object *mdd_obj, struct thandle *handle,
966                              const struct lu_fid *pfid)
967 {
968         int                              rc;
969
970         LASSERT(mdd_obj != NULL);
971         LASSERT(handle != NULL);
972
973         if (!mdd_changelog_enabled(env, mdd, type))
974                 RETURN(0);
975
976         if (mdd_is_volatile_obj(mdd_obj))
977                 RETURN(0);
978
979         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
980             ktime_before(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
981                 /* Don't need multiple updates in this log */
982                 /* Don't check under lock - no big deal if we get an extra
983                    entry */
984                 RETURN(0);
985         }
986
987         rc = mdd_changelog_data_store_by_fid(env, mdd, type, clf_flags,
988                                              mdd_object_fid(mdd_obj), pfid,
989                                              NULL, handle);
990         if (rc == 0)
991                 mdd_obj->mod_cltime = ktime_get();
992
993         RETURN(rc);
994 }
995
996 int mdd_changelog_data_store_xattr(const struct lu_env *env,
997                                    struct mdd_device *mdd,
998                                    enum changelog_rec_type type,
999                                    enum changelog_rec_flags clf_flags,
1000                                    struct mdd_object *mdd_obj,
1001                                    const char *xattr_name,
1002                                    struct thandle *handle)
1003 {
1004         int rc;
1005
1006         LASSERT(mdd_obj != NULL);
1007         LASSERT(handle != NULL);
1008
1009         if (!mdd_changelog_enabled(env, mdd, type))
1010                 RETURN(0);
1011
1012         if (mdd_is_volatile_obj(mdd_obj))
1013                 RETURN(0);
1014
1015         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1016             ktime_before(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1017                 /* Don't need multiple updates in this log */
1018                 /* Don't check under lock - no big deal if we get an extra
1019                  * entry
1020                  */
1021                 RETURN(0);
1022         }
1023
1024         rc = mdd_changelog_data_store_by_fid(env, mdd, type, clf_flags,
1025                                              mdd_object_fid(mdd_obj), NULL,
1026                                              xattr_name, handle);
1027         if (rc == 0)
1028                 mdd_obj->mod_cltime = ktime_get();
1029
1030         RETURN(rc);
1031 }
1032
1033 /* only the bottom CLF_FLAGSHIFT bits of @flags are stored in the record,
1034  * except for open flags have a dedicated record to store 32 bits of flags */
1035 static int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1036                          enum changelog_rec_flags clf_flags,
1037                          struct md_device *m, const struct lu_fid *fid)
1038 {
1039         struct thandle *handle;
1040         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1041         int rc;
1042         ENTRY;
1043
1044         LASSERT(fid != NULL);
1045
1046         /* We'll check this again below, but we check now before we
1047          * start a transaction. */
1048         if (!mdd_changelog_enabled(env, mdd, type))
1049                 RETURN(0);
1050
1051         handle = mdd_trans_create(env, mdd);
1052         if (IS_ERR(handle))
1053                 RETURN(PTR_ERR(handle));
1054
1055         rc = mdd_declare_changelog_store(env, mdd, type, NULL, NULL, handle);
1056         if (rc)
1057                 GOTO(stop, rc);
1058
1059         rc = mdd_trans_start(env, mdd, handle);
1060         if (rc)
1061                 GOTO(stop, rc);
1062
1063         rc = mdd_changelog_data_store_by_fid(env, mdd, type, clf_flags,
1064                                              fid, NULL, NULL, handle);
1065
1066 stop:
1067         rc = mdd_trans_stop(env, mdd, rc, handle);
1068
1069         RETURN(rc);
1070 }
1071
1072 /**
1073  * Save LMA extended attributes with data from \a ma.
1074  *
1075  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1076  * not, LMA EA will be first read from disk, modified and write back.
1077  *
1078  */
1079 /* Precedence for choosing record type when multiple
1080  * attributes change: setattr > mtime > ctime > atime
1081  * (ctime changes when mtime does, plus chmod/chown.
1082  * atime and ctime are independent.) */
1083 static int mdd_attr_set_changelog(const struct lu_env *env,
1084                                   struct md_object *obj, struct thandle *handle,
1085                                   const struct lu_fid *pfid, __u64 valid)
1086 {
1087         struct mdd_device *mdd = mdo2mdd(obj);
1088         int bits, type = 0;
1089
1090         bits =  (valid & LA_SIZE)  ? BIT(CL_TRUNC) : 0;
1091         bits |= (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? BIT(CL_SETATTR) : 0;
1092         bits |= (valid & LA_MTIME) ? BIT(CL_MTIME) : 0;
1093         bits |= (valid & LA_CTIME) ? BIT(CL_CTIME) : 0;
1094         bits |= (valid & LA_ATIME) ? BIT(CL_ATIME) : 0;
1095         bits = bits & mdd->mdd_cl.mc_current_mask;
1096         /* This is an implementation limit rather than a protocol limit */
1097         BUILD_BUG_ON(CL_LAST > sizeof(int) * 8);
1098         if (bits == 0)
1099                 return 0;
1100
1101         /* The record type is the lowest non-masked set bit */
1102         type = __ffs(bits);
1103
1104         /* XXX: we only store the low CLF_FLAGMASK bits of la_valid */
1105         return mdd_changelog_data_store(env, mdd, type, valid, md2mdd_obj(obj),
1106                                         handle, pfid);
1107 }
1108
1109 static int mdd_declare_attr_set(const struct lu_env *env,
1110                                 struct mdd_device *mdd,
1111                                 struct mdd_object *obj,
1112                                 const struct lu_attr *attr,
1113                                 struct thandle *handle)
1114 {
1115         int rc;
1116
1117         rc = mdo_declare_attr_set(env, obj, attr, handle);
1118         if (rc)
1119                 return rc;
1120
1121 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1122         if (attr->la_valid & LA_MODE) {
1123                 mdd_read_lock(env, obj, DT_TGT_CHILD);
1124                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1125                                    XATTR_NAME_ACL_ACCESS);
1126                 mdd_read_unlock(env, obj);
1127                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1128                         rc = 0;
1129                 else if (rc < 0)
1130                         return rc;
1131
1132                 if (rc != 0) {
1133                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1134                         rc = mdo_declare_xattr_set(env, obj, buf,
1135                                                    XATTR_NAME_ACL_ACCESS, 0,
1136                                                    handle);
1137                         if (rc)
1138                                 return rc;
1139                 }
1140         }
1141 #endif
1142
1143         rc = mdd_declare_changelog_store(env, mdd, CL_SETXATTR, NULL, NULL,
1144                                          handle);
1145         return rc;
1146 }
1147
1148 /*
1149  * LU-3671
1150  * LU-7239
1151  *
1152  * permission changes may require sync operation, to mitigate performance
1153  * impact, only do this for dir and when permission is reduced.
1154  *
1155  * For regular files, version is updated with permission change (see VBR), async
1156  * permission won't cause any issue, while missing permission change on
1157  * directory may affect accessibility of other objects after recovery.
1158  */
1159 static inline bool permission_needs_sync(const struct lu_attr *old,
1160                                          const struct lu_attr *new)
1161 {
1162         if (!S_ISDIR(old->la_mode))
1163                 return false;
1164
1165         if (new->la_valid & LA_UID && old->la_uid != new->la_uid)
1166                 return true;
1167
1168         if (new->la_valid & LA_GID && old->la_gid != new->la_gid)
1169                 return true;
1170
1171         if (new->la_valid & LA_MODE) {
1172                 /* turned on sticky bit */
1173                 if (!(old->la_mode & S_ISVTX) && (new->la_mode & S_ISVTX))
1174                         return true;
1175
1176                 /* set-GID has no impact on what is allowed, not checked */
1177
1178                 /* turned off setuid bit, or one of rwx for someone */
1179                 if (((new->la_mode & old->la_mode) & (0777 | S_ISUID)) !=
1180                      (old->la_mode & (0777 | S_ISUID)))
1181                         return true;
1182         }
1183
1184         return false;
1185 }
1186
1187 static inline __u64 mdd_lmm_dom_size(void *buf)
1188 {
1189         struct lov_mds_md *lmm = buf;
1190         struct lov_comp_md_v1 *comp_v1;
1191         struct lov_mds_md *v1;
1192         __u32 off;
1193
1194         if (lmm == NULL)
1195                 return 0;
1196
1197         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
1198                 return 0;
1199
1200         comp_v1 = (struct lov_comp_md_v1 *)lmm;
1201         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
1202         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
1203
1204         /* DoM entry is the first entry always */
1205         if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == LOV_PATTERN_MDT)
1206                 return le64_to_cpu(comp_v1->lcm_entries[0].lcme_extent.e_end);
1207
1208         return 0;
1209 }
1210
1211 /* set attr and LOV EA at once, return updated attr */
1212 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1213                  const struct md_attr *ma)
1214 {
1215         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1216         struct mdd_device *mdd = mdo2mdd(obj);
1217         struct thandle *handle = NULL;
1218         struct lu_attr *la_copy = &mdd_env_info(env)->mdi_la_for_fix;
1219         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
1220         const struct lu_attr *la = &ma->ma_attr;
1221         struct lu_ucred  *uc;
1222         struct lquota_id_info qi;
1223         bool quota_reserved = false;
1224         bool chrgrp_by_unprivileged_user = false;
1225         int rc;
1226         ENTRY;
1227
1228         /* we do not use ->attr_set() for LOV/HSM EA any more */
1229         LASSERT((ma->ma_valid & MA_LOV) == 0);
1230         LASSERT((ma->ma_valid & MA_HSM) == 0);
1231
1232         rc = mdd_la_get(env, mdd_obj, attr);
1233         if (rc)
1234                 RETURN(rc);
1235
1236         *la_copy = ma->ma_attr;
1237         rc = mdd_fix_attr(env, mdd_obj, attr, la_copy, ma);
1238         if (rc)
1239                 RETURN(rc);
1240
1241         /* no need to setattr anymore */
1242         if (la_copy->la_valid == 0) {
1243                 CDEBUG(D_INODE,
1244                        "%s: no valid attribute on "DFID", previous was %#llx\n",
1245                        mdd_obj_dev_name(mdd_obj),
1246                        PFID(mdd_object_fid(mdd_obj)), la->la_valid);
1247
1248                 RETURN(0);
1249         }
1250
1251         /* If an unprivileged user changes group of some file,
1252          * the setattr operation will be processed synchronously to
1253          * honor the quota limit of the corresponding group. see LU-5152 */
1254         uc = lu_ucred_check(env);
1255         memset(&qi, 0, sizeof(qi));
1256         if (S_ISREG(attr->la_mode) && la->la_valid & LA_GID &&
1257             la->la_gid != attr->la_gid && uc != NULL && uc->uc_fsuid != 0) {
1258                 CDEBUG(D_QUOTA, "%s: reserve quota for changing group: gid=%u size=%llu\n",
1259                        mdd2obd_dev(mdd)->obd_name, la->la_gid, la->la_size);
1260
1261                 if (la->la_valid & LA_BLOCKS)
1262                         qi.lqi_space = la->la_blocks << 9;
1263                 else if (la->la_valid & LA_SIZE)
1264                         qi.lqi_space = la->la_size;
1265                 /* use local attr gotten above */
1266                 else if (attr->la_valid & LA_BLOCKS)
1267                         qi.lqi_space = attr->la_blocks << 9;
1268                 else if (attr->la_valid & LA_SIZE)
1269                         qi.lqi_space = attr->la_size;
1270
1271                 if (qi.lqi_space > 0) {
1272                         qi.lqi_id.qid_gid = la->la_gid;
1273                         qi.lqi_type = GRPQUOTA;
1274                         qi.lqi_space = toqb(qi.lqi_space);
1275                         qi.lqi_is_blk = true;
1276                         rc = dt_reserve_or_free_quota(env, mdd->mdd_bottom,
1277                                                       &qi);
1278
1279                         if (rc) {
1280                                 CDEBUG(D_QUOTA, "%s: failed to reserve quota for gid %d size %llu\n",
1281                                        mdd2obd_dev(mdd)->obd_name,
1282                                        la->la_gid, qi.lqi_space);
1283
1284                                 GOTO(out, rc);
1285                         }
1286
1287                         quota_reserved = true;
1288                         la_copy->la_valid |= LA_FLAGS;
1289                 }
1290
1291                 chrgrp_by_unprivileged_user = true;
1292
1293                 /* Flush the possible existing client setattr requests to OSTs
1294                  * to keep the order with the current setattr operation that
1295                  * will be sent directly to OSTs. see LU-5152 */
1296                 /* LU-11303 disable sync as this is too heavyweight.
1297                  * This should be replaced with a sync only for the object
1298                  * being modified here, not the whole filesystem.
1299                 rc = dt_sync(env, mdd->mdd_child);
1300                 if (rc)
1301                         GOTO(out, rc);
1302                  */
1303         }
1304
1305         handle = mdd_trans_create(env, mdd);
1306         if (IS_ERR(handle)) {
1307                 rc = PTR_ERR(handle);
1308                 handle = NULL;
1309
1310                 GOTO(out, rc);
1311         }
1312
1313         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la_copy, handle);
1314         if (rc)
1315                 GOTO(out, rc);
1316
1317         rc = mdd_trans_start(env, mdd, handle);
1318         if (rc)
1319                 GOTO(out, rc);
1320
1321         if (!chrgrp_by_unprivileged_user && mdd->mdd_sync_permission &&
1322             permission_needs_sync(attr, la))
1323                 handle->th_sync = 1;
1324
1325         if (la->la_valid & (LA_MTIME | LA_CTIME))
1326                 CDEBUG(D_INODE, "setting mtime %llu, ctime %llu\n",
1327                        la->la_mtime, la->la_ctime);
1328
1329         mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
1330
1331         /* LU-10509: setattr of LA_SIZE should be skipped case of DOM,
1332          * otherwise following truncate will do nothing and truncated
1333          * data may be read again. This is a quick fix until LU-11033
1334          * will be resolved.
1335          */
1336         if (la_copy->la_valid & LA_SIZE) {
1337                 struct lu_buf *lov_buf = mdd_buf_get(env, NULL, 0);
1338
1339                 rc = mdd_stripe_get(env, mdd_obj, lov_buf, XATTR_NAME_LOV);
1340                 if (rc) {
1341                         rc = 0;
1342                 } else {
1343                         if (mdd_lmm_dom_size(lov_buf->lb_buf) > 0)
1344                                 la_copy->la_valid &= ~LA_SIZE;
1345                         lu_buf_free(lov_buf);
1346                 }
1347         }
1348
1349         if (la_copy->la_valid) {
1350                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1351
1352                 if (rc == -EDQUOT && la_copy->la_flags & LUSTRE_SET_SYNC_FL) {
1353                         /* rollback to the original gid */
1354                         la_copy->la_flags &= ~LUSTRE_SET_SYNC_FL;
1355                         la_copy->la_gid = attr->la_gid;
1356                         mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1357                 }
1358         }
1359         mdd_write_unlock(env, mdd_obj);
1360
1361 out:
1362         if (rc == 0)
1363                 rc = mdd_attr_set_changelog(env, obj, handle, &ma->ma_pfid,
1364                                             la_copy->la_valid);
1365
1366         if (rc == 0 && quota_reserved) {
1367                 struct thandle *sub_th;
1368
1369                 sub_th = thandle_get_sub_by_dt(env, handle, mdd->mdd_bottom);
1370                 if (unlikely(IS_ERR(sub_th))) {
1371                         qi.lqi_space *= -1;
1372                         dt_reserve_or_free_quota(env, mdd->mdd_bottom, &qi);
1373                 } else {
1374                         sub_th->th_reserved_quota = qi;
1375                 }
1376         } else if (quota_reserved) {
1377                 qi.lqi_space *= -1;
1378                 dt_reserve_or_free_quota(env, mdd->mdd_bottom, &qi);
1379         }
1380
1381         if (handle != NULL)
1382                 rc = mdd_trans_stop(env, mdd, rc, handle);
1383
1384         return rc;
1385 }
1386
1387 static int mdd_xattr_sanity_check(const struct lu_env *env,
1388                                   struct mdd_object *obj,
1389                                   const struct lu_attr *attr,
1390                                   const char *name)
1391 {
1392         struct lu_ucred *uc     = lu_ucred_assert(env);
1393         ENTRY;
1394
1395         if (attr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
1396                 RETURN(-EPERM);
1397
1398         if (strncmp(XATTR_USER_PREFIX, name,
1399                     sizeof(XATTR_USER_PREFIX) - 1) == 0) {
1400                 /* For sticky directories, only the owner and privileged user
1401                  * can write attributes. */
1402                 if (S_ISDIR(attr->la_mode) && (attr->la_mode & S_ISVTX) &&
1403                     (uc->uc_fsuid != attr->la_uid) &&
1404                     !cap_raised(uc->uc_cap, CAP_FOWNER))
1405                         RETURN(-EPERM);
1406         } else if (strcmp(name, XATTR_NAME_SOM) != 0 &&
1407                    (uc->uc_fsuid != attr->la_uid) &&
1408                    !cap_raised(uc->uc_cap, CAP_FOWNER)) {
1409                 RETURN(-EPERM);
1410         }
1411
1412         RETURN(0);
1413 }
1414
1415 /**
1416  * Check if a string begins with a given prefix.
1417  *
1418  * \param str     String to check
1419  * \param prefix  Substring to check at the beginning of \a str
1420  * \return true/false whether the condition is verified.
1421  */
1422 static inline bool has_prefix(const char *str, const char *prefix)
1423 {
1424         return strncmp(prefix, str, strlen(prefix)) == 0;
1425 }
1426
1427 /**
1428  * Indicate the kind of changelog to store (if any) for a xattr set/del.
1429  *
1430  * \param[in]  xattr_name  Full extended attribute name.
1431  *
1432  * \return type of changelog to use, or CL_NONE if no changelog is to be emitted
1433  */
1434 static enum changelog_rec_type
1435 mdd_xattr_changelog_type(const struct lu_env *env, struct mdd_device *mdd,
1436                          const char *xattr_name)
1437 {
1438         /* Layout changes systematically recorded */
1439         if (strcmp(XATTR_NAME_LOV, xattr_name) == 0 ||
1440             strcmp(XATTR_LUSTRE_LOV, xattr_name) == 0 ||
1441             allowed_lustre_lov(xattr_name))
1442                 return CL_LAYOUT;
1443
1444         /* HSM information changes systematically recorded */
1445         if (strcmp(XATTR_NAME_HSM, xattr_name) == 0)
1446                 return CL_HSM;
1447
1448         /* Avoid logging SOM xattr for every file */
1449         if (strcmp(XATTR_NAME_SOM, xattr_name) == 0)
1450                 return CL_NONE;
1451
1452         if (has_prefix(xattr_name, XATTR_USER_PREFIX) ||
1453             has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_ACCESS) ||
1454             has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_DEFAULT) ||
1455             has_prefix(xattr_name, XATTR_TRUSTED_PREFIX) ||
1456             has_prefix(xattr_name, XATTR_SECURITY_PREFIX))
1457                 return CL_SETXATTR;
1458
1459         return CL_NONE;
1460 }
1461
1462 static int mdd_declare_xattr_set(const struct lu_env *env,
1463                                  struct mdd_device *mdd,
1464                                  struct mdd_object *obj,
1465                                  const struct lu_buf *buf,
1466                                  const char *name,
1467                                  int fl, struct thandle *handle)
1468 {
1469         enum changelog_rec_type type;
1470         int rc;
1471
1472         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1473         if (rc)
1474                 return rc;
1475
1476         type = mdd_xattr_changelog_type(env, mdd, name);
1477         if (type < 0)
1478                 return 0; /* no changelog to store */
1479
1480         return mdd_declare_changelog_store(env, mdd, type, NULL, NULL, handle);
1481 }
1482
1483 /*
1484  * Compare current and future data of HSM EA and add a changelog if needed.
1485  *
1486  * Caller should have write-locked \param obj.
1487  *
1488  * \param buf - Future HSM EA content.
1489  * \retval 0 if no changelog is needed or changelog was added properly.
1490  * \retval -ve errno if there was a problem
1491  */
1492 static int mdd_hsm_update_locked(const struct lu_env *env,
1493                                  struct md_object *obj,
1494                                  const struct lu_buf *buf,
1495                                  struct thandle *handle,
1496                                  enum changelog_rec_flags *clf_flags)
1497 {
1498         struct mdd_thread_info *info = mdd_env_info(env);
1499         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1500         struct lu_buf *current_buf;
1501         struct md_hsm *current_mh;
1502         struct md_hsm *new_mh;
1503         int rc;
1504
1505         ENTRY;
1506         OBD_ALLOC_PTR(current_mh);
1507         if (current_mh == NULL)
1508                 RETURN(-ENOMEM);
1509
1510         /* Read HSM attrs from disk */
1511         current_buf = lu_buf_check_and_alloc(&info->mdi_xattr_buf,
1512                         min_t(unsigned int,
1513                               mdd_obj2mdd_dev(mdd_obj)->mdd_dt_conf.ddp_max_ea_size,
1514                             XATTR_SIZE_MAX));
1515         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
1516         rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
1517         if (rc < 0 && rc != -ENODATA)
1518                 GOTO(free, rc);
1519         else if (rc == -ENODATA)
1520                 current_mh->mh_flags = 0;
1521
1522         /* Map future HSM xattr */
1523         OBD_ALLOC_PTR(new_mh);
1524         if (new_mh == NULL)
1525                 GOTO(free, rc = -ENOMEM);
1526         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1527
1528         rc = 0;
1529
1530         /* Flags differ, set flags for the changelog that will be added */
1531         if (current_mh->mh_flags != new_mh->mh_flags) {
1532                 hsm_set_cl_event(clf_flags, HE_STATE);
1533                 if (new_mh->mh_flags & HS_DIRTY)
1534                         hsm_set_cl_flags(clf_flags, CLF_HSM_DIRTY);
1535         }
1536
1537         OBD_FREE_PTR(new_mh);
1538         EXIT;
1539 free:
1540         OBD_FREE_PTR(current_mh);
1541         return rc;
1542 }
1543
1544 static int mdd_object_pfid_replace(const struct lu_env *env,
1545                                    struct mdd_object *o)
1546 {
1547         struct mdd_device *mdd = mdo2mdd(&o->mod_obj);
1548         struct thandle *handle;
1549         int rc;
1550
1551         handle = mdd_trans_create(env, mdd);
1552         if (IS_ERR(handle))
1553                 RETURN(PTR_ERR(handle));
1554
1555         handle->th_complex = 1;
1556
1557         /* it doesn't need to track the PFID update via llog, because LFSCK
1558          * will repair it even it goes wrong */
1559         rc = mdd_declare_xattr_set(env, mdd, o, NULL, XATTR_NAME_FID,
1560                                    0, handle);
1561         if (rc)
1562                 GOTO(out, rc);
1563
1564         rc = mdd_trans_start(env, mdd, handle);
1565         if (rc != 0)
1566                 GOTO(out, rc);
1567
1568         rc = mdo_xattr_set(env, o, NULL, XATTR_NAME_FID, 0, handle);
1569         if (rc)
1570                 GOTO(out, rc);
1571
1572 out:
1573         mdd_trans_stop(env, mdd, rc, handle);
1574         return rc;
1575 }
1576
1577
1578 static int mdd_declare_xattr_del(const struct lu_env *env,
1579                                  struct mdd_device *mdd,
1580                                  struct mdd_object *obj,
1581                                  const char *name,
1582                                  struct thandle *handle);
1583
1584 static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1585                          const char *name);
1586
1587 static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
1588                            struct md_object *md_vic)
1589 {
1590         struct mdd_device *mdd = mdo2mdd(md_obj);
1591         struct mdd_object *obj = md2mdd_obj(md_obj);
1592         struct mdd_object *vic = md2mdd_obj(md_vic);
1593         struct lu_buf *buf = &mdd_env_info(env)->mdi_buf[0];
1594         struct lu_buf *buf_vic = &mdd_env_info(env)->mdi_buf[1];
1595         struct lov_mds_md *lmm;
1596         struct thandle *handle;
1597         int rc, lock_order;
1598         ENTRY;
1599
1600         lock_order = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
1601         if (lock_order == 0) /* same fid */
1602                 RETURN(-EPERM);
1603
1604         handle = mdd_trans_create(env, mdd);
1605         if (IS_ERR(handle))
1606                 RETURN(PTR_ERR(handle));
1607
1608         /* get EA of victim file */
1609         memset(buf_vic, 0, sizeof(*buf_vic));
1610         rc = mdd_stripe_get(env, vic, buf_vic, XATTR_NAME_LOV);
1611         if (rc < 0) {
1612                 if (rc == -ENODATA)
1613                         rc = 0;
1614                 GOTO(stop, rc);
1615         }
1616
1617         /* parse the layout of victim file */
1618         lmm = buf_vic->lb_buf;
1619         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
1620                 GOTO(stop, rc = -EINVAL);
1621
1622         /* save EA of target file for restore */
1623         memset(buf, 0, sizeof(*buf));
1624         rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
1625         if (rc < 0)
1626                 GOTO(stop, rc);
1627
1628         /* Get rid of the layout from victim object */
1629         rc = mdd_declare_xattr_del(env, mdd, vic, XATTR_NAME_LOV, handle);
1630         if (rc)
1631                 GOTO(stop, rc);
1632
1633         rc = mdd_declare_xattr_set(env, mdd, obj, buf_vic, XATTR_NAME_LOV,
1634                                    LU_XATTR_MERGE, handle);
1635         if (rc)
1636                 GOTO(stop, rc);
1637
1638         rc = mdd_trans_start(env, mdd, handle);
1639         if (rc != 0)
1640                 GOTO(stop, rc);
1641
1642         if (lock_order > 0) {
1643                 mdd_write_lock(env, obj, DT_TGT_CHILD);
1644                 mdd_write_lock(env, vic, DT_TGT_CHILD);
1645         } else {
1646                 mdd_write_lock(env, vic, DT_TGT_CHILD);
1647                 mdd_write_lock(env, obj, DT_TGT_CHILD);
1648         }
1649
1650         rc = mdo_xattr_set(env, obj, buf_vic, XATTR_NAME_LOV, LU_XATTR_MERGE,
1651                            handle);
1652         if (rc)
1653                 GOTO(out, rc);
1654
1655         rc = mdo_xattr_del(env, vic, XATTR_NAME_LOV, handle);
1656         if (rc)
1657                 GOTO(out_restore, rc);
1658
1659         (void)mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
1660                                        NULL);
1661         (void)mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle,
1662                                        NULL);
1663         EXIT;
1664
1665 out_restore:
1666         if (rc) {
1667                 int rc2 = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1668                                         LU_XATTR_REPLACE, handle);
1669                 if (rc2)
1670                         CERROR("%s: failed rollback of "DFID" layout: file state unknown: rc = %d\n",
1671                                mdd_obj_dev_name(obj),
1672                                PFID(mdd_object_fid(obj)), rc2);
1673         }
1674
1675 out:
1676         mdd_write_unlock(env, obj);
1677         mdd_write_unlock(env, vic);
1678 stop:
1679         mdd_trans_stop(env, mdd, rc, handle);
1680         lu_buf_free(buf);
1681         lu_buf_free(buf_vic);
1682
1683         if (!rc)
1684                 (void) mdd_object_pfid_replace(env, obj);
1685
1686         return rc;
1687 }
1688
1689 /**
1690  * Extract the mirror with specified mirror id, and store the splitted
1691  * mirror layout to @buf.
1692  *
1693  * \param[in] comp_v1   mirrored layout
1694  * \param[in] mirror_id the mirror with mirror_id to be extracted
1695  * \param[out] buf      store the layout excluding the extracted mirror,
1696  *                      caller free the buffer we allocated in this function
1697  * \param[out] buf_vic  store the extracted layout, caller free the buffer
1698  *                      we allocated in this function
1699  *
1700  * \retval      0 on success; < 0 if error happens
1701  */
1702 static int mdd_split_ea(struct lov_comp_md_v1 *comp_v1, __u16 mirror_id,
1703                         struct lu_buf *buf, struct lu_buf *buf_vic)
1704 {
1705         struct lov_comp_md_v1 *comp_rem;
1706         struct lov_comp_md_v1 *comp_vic;
1707         struct lov_comp_md_entry_v1 *entry;
1708         struct lov_comp_md_entry_v1 *entry_rem;
1709         struct lov_comp_md_entry_v1 *entry_vic;
1710         __u16 mirror_cnt;
1711         __u16 comp_cnt, count = 0;
1712         int lmm_size, lmm_size_vic = 0;
1713         int i, j, k;
1714         int offset, offset_rem, offset_vic;
1715
1716         mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
1717         /* comp_v1 should contains more than 1 mirror */
1718         if (mirror_cnt <= 1)
1719                 return -EINVAL;
1720         comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
1721         lmm_size = le32_to_cpu(comp_v1->lcm_size);
1722
1723         for (i = 0; i < comp_cnt; i++) {
1724                 entry = &comp_v1->lcm_entries[i];
1725                 if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id) {
1726                         count++;
1727                         lmm_size_vic += sizeof(*entry);
1728                         lmm_size_vic += le32_to_cpu(entry->lcme_size);
1729                 } else if (count > 0) {
1730                         /* find the specified mirror */
1731                         break;
1732                 }
1733         }
1734
1735         if (count == 0)
1736                 return -EINVAL;
1737
1738         lu_buf_alloc(buf, lmm_size - lmm_size_vic);
1739         if (!buf->lb_buf)
1740                 return -ENOMEM;
1741
1742         lu_buf_alloc(buf_vic, sizeof(*comp_vic) + lmm_size_vic);
1743         if (!buf_vic->lb_buf) {
1744                 lu_buf_free(buf);
1745                 return -ENOMEM;
1746         }
1747
1748         comp_rem = (struct lov_comp_md_v1 *)buf->lb_buf;
1749         comp_vic = (struct lov_comp_md_v1 *)buf_vic->lb_buf;
1750
1751         memcpy(comp_rem, comp_v1, sizeof(*comp_v1));
1752         comp_rem->lcm_mirror_count = cpu_to_le16(mirror_cnt - 2);
1753         comp_rem->lcm_entry_count = cpu_to_le32(comp_cnt - count);
1754         comp_rem->lcm_size = cpu_to_le32(lmm_size - lmm_size_vic);
1755         if (!comp_rem->lcm_mirror_count)
1756                 comp_rem->lcm_flags = cpu_to_le16(LCM_FL_NONE);
1757
1758         memset(comp_vic, 0, sizeof(*comp_v1));
1759         comp_vic->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
1760         comp_vic->lcm_mirror_count = 0;
1761         comp_vic->lcm_entry_count = cpu_to_le32(count);
1762         comp_vic->lcm_size = cpu_to_le32(lmm_size_vic + sizeof(*comp_vic));
1763         comp_vic->lcm_flags = cpu_to_le16(LCM_FL_NONE);
1764         comp_vic->lcm_layout_gen = 0;
1765
1766         offset = sizeof(*comp_v1) + sizeof(*entry) * comp_cnt;
1767         offset_rem = sizeof(*comp_rem) +
1768                      sizeof(*entry_rem) * (comp_cnt - count);
1769         offset_vic = sizeof(*comp_vic) + sizeof(*entry_vic) * count;
1770         for (i = j = k = 0; i < comp_cnt; i++) {
1771                 struct lov_mds_md *lmm, *lmm_dst;
1772                 bool vic = false;
1773
1774                 entry = &comp_v1->lcm_entries[i];
1775                 entry_vic = &comp_vic->lcm_entries[j];
1776                 entry_rem = &comp_rem->lcm_entries[k];
1777
1778                 if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id)
1779                         vic = true;
1780
1781                 /* copy component entry */
1782                 if (vic) {
1783                         memcpy(entry_vic, entry, sizeof(*entry));
1784                         entry_vic->lcme_flags &= cpu_to_le32(LCME_FL_INIT);
1785                         entry_vic->lcme_offset = cpu_to_le32(offset_vic);
1786                         j++;
1787                 } else {
1788                         memcpy(entry_rem, entry, sizeof(*entry));
1789                         entry_rem->lcme_offset = cpu_to_le32(offset_rem);
1790                         k++;
1791                 }
1792
1793                 lmm = (struct lov_mds_md *)((char *)comp_v1 + offset);
1794                 if (vic)
1795                         lmm_dst = (struct lov_mds_md *)
1796                                         ((char *)comp_vic + offset_vic);
1797                 else
1798                         lmm_dst = (struct lov_mds_md *)
1799                                         ((char *)comp_rem + offset_rem);
1800
1801                 /* copy component entry blob */
1802                 memcpy(lmm_dst, lmm, le32_to_cpu(entry->lcme_size));
1803
1804                 /* blob offset advance */
1805                 offset += le32_to_cpu(entry->lcme_size);
1806                 if (vic)
1807                         offset_vic += le32_to_cpu(entry->lcme_size);
1808                 else
1809                         offset_rem += le32_to_cpu(entry->lcme_size);
1810         }
1811
1812         return 0;
1813 }
1814
1815 static int mdd_dom_data_truncate(const struct lu_env *env,
1816                                  struct mdd_device *mdd, struct mdd_object *mo);
1817
1818 static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
1819                            struct md_rejig_data *mrd)
1820 {
1821         struct mdd_device *mdd = mdo2mdd(md_obj);
1822         struct mdd_object *obj = md2mdd_obj(md_obj);
1823         struct mdd_object *vic = NULL;
1824         struct lu_buf *buf = &mdd_env_info(env)->mdi_buf[0];
1825         struct lu_buf *buf_save = &mdd_env_info(env)->mdi_buf[1];
1826         struct lu_buf *buf_vic = &mdd_env_info(env)->mdi_buf[2];
1827         struct lov_comp_md_v1 *lcm;
1828         struct thandle *handle;
1829         int rc;
1830         bool dom_stripe = false;
1831
1832         ENTRY;
1833
1834         /**
1835          * NULL @mrd_obj means mirror deleting, and use NULL vic to indicate
1836          * mirror deleting
1837          */
1838         if (mrd->mrd_obj)
1839                 vic = md2mdd_obj(mrd->mrd_obj);
1840
1841         handle = mdd_trans_create(env, mdd);
1842         if (IS_ERR(handle))
1843                 RETURN(PTR_ERR(handle));
1844
1845         /* get EA of mirrored file */
1846         memset(buf_save, 0, sizeof(*buf));
1847         rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
1848         if (rc < 0)
1849                 GOTO(stop, rc);
1850
1851         lcm = buf_save->lb_buf;
1852         if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
1853                 GOTO(stop, rc = -EINVAL);
1854
1855         /**
1856          * Extract the mirror with specified mirror id, and store the splitted
1857          * mirror layout to the victim buffer.
1858          */
1859         memset(buf, 0, sizeof(*buf));
1860         memset(buf_vic, 0, sizeof(*buf_vic));
1861         rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
1862         if (rc < 0)
1863                 GOTO(stop, rc);
1864         /**
1865          * @buf stores layout w/o the specified mirror, @buf_vic stores the
1866          * splitted mirror
1867          */
1868
1869         dom_stripe = mdd_lmm_dom_size(buf_vic->lb_buf) > 0;
1870
1871         if (vic) {
1872                 /**
1873                  * non delete mirror split
1874                  *
1875                  * declare obj set remaining layout in @buf, will set obj's
1876                  * in-memory layout
1877                  */
1878                 rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
1879                                            LU_XATTR_SPLIT, handle);
1880                 if (rc)
1881                         GOTO(stop, rc);
1882
1883                 /* declare vic set splitted layout in @buf_vic */
1884                 rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
1885                                            XATTR_NAME_LOV, LU_XATTR_SPLIT,
1886                                            handle);
1887                 if (rc)
1888                         GOTO(stop, rc);
1889         } else {
1890                 /**
1891                  * declare delete mirror objects in @buf_vic, will change obj's
1892                  * in-memory layout
1893                  */
1894                 rc = mdd_declare_xattr_set(env, mdd, obj, buf_vic,
1895                                            XATTR_NAME_LOV, LU_XATTR_PURGE,
1896                                            handle);
1897                 if (rc)
1898                         GOTO(stop, rc);
1899
1900                 /* declare obj set remaining layout in @buf */
1901                 rc = mdd_declare_xattr_set(env, mdd, obj, buf,
1902                                            XATTR_NAME_LOV, LU_XATTR_SPLIT,
1903                                            handle);
1904                 if (rc)
1905                         GOTO(stop, rc);
1906         }
1907
1908         rc = mdd_trans_start(env, mdd, handle);
1909         if (rc)
1910                 GOTO(stop, rc);
1911
1912         if (vic) {
1913                 /* don't use the same file to save the splitted mirror */
1914                 rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
1915                 if (rc == 0)
1916                         GOTO(stop, rc = -EPERM);
1917
1918                 if (rc > 0) {
1919                         mdd_write_lock(env, obj, DT_TGT_CHILD);
1920                         mdd_write_lock(env, vic, DT_TGT_CHILD);
1921                 } else {
1922                         mdd_write_lock(env, vic, DT_TGT_CHILD);
1923                         mdd_write_lock(env, obj, DT_TGT_CHILD);
1924                 }
1925         } else {
1926                 mdd_write_lock(env, obj, DT_TGT_CHILD);
1927         }
1928
1929         /* set obj's layout in @buf */
1930         rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_SPLIT,
1931                            handle);
1932         if (rc)
1933                 GOTO(unlock, rc);
1934
1935         if (vic) {
1936                 /* set vic's layout in @buf_vic */
1937                 rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV,
1938                                    LU_XATTR_CREATE, handle);
1939                 if (rc)
1940                         GOTO(out_restore, rc);
1941         } else {
1942                 /* delete mirror objects */
1943                 rc = mdo_xattr_set(env, obj, buf_vic, XATTR_NAME_LOV,
1944                                    LU_XATTR_PURGE, handle);
1945                 if (rc)
1946                         GOTO(out_restore, rc);
1947         }
1948
1949         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
1950                                       NULL);
1951         if (rc)
1952                 GOTO(out_restore, rc);
1953
1954         if (vic) {
1955                 rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
1956                                               handle, NULL);
1957                 if (rc)
1958                         GOTO(out_restore, rc);
1959         }
1960
1961 out_restore:
1962         if (rc) {
1963                 /* restore obj's in-memory and on-disk layout */
1964                 int rc2 = mdo_xattr_set(env, obj, buf_save, XATTR_NAME_LOV,
1965                                         LU_XATTR_REPLACE, handle);
1966                 if (rc2)
1967                         CERROR("%s: failed rollback "DFID
1968                                " layout: file state unknown: rc = %d\n",
1969                                mdd_obj_dev_name(obj),
1970                                PFID(mdd_object_fid(obj)), rc);
1971         }
1972
1973 unlock:
1974         mdd_write_unlock(env, obj);
1975         if (vic)
1976                 mdd_write_unlock(env, vic);
1977 stop:
1978         rc = mdd_trans_stop(env, mdd, rc, handle);
1979
1980         /* Truncate local DOM data if all went well */
1981         if (!rc && dom_stripe)
1982                 mdd_dom_data_truncate(env, mdd, obj);
1983
1984         lu_buf_free(buf_save);
1985         lu_buf_free(buf);
1986         lu_buf_free(buf_vic);
1987
1988         if (!rc)
1989                 (void) mdd_object_pfid_replace(env, obj);
1990
1991         return rc;
1992 }
1993
1994 static int mdd_layout_merge_allowed(const struct lu_env *env,
1995                                     struct md_object *target,
1996                                     struct md_object *victim)
1997 {
1998         struct mdd_object *o1 = md2mdd_obj(target);
1999
2000         /* cannot extend directory's LOVEA */
2001         if (S_ISDIR(mdd_object_type(o1))) {
2002                 CERROR("%s: Don't extend directory's LOVEA, just set it.\n",
2003                        mdd_obj_dev_name(o1));
2004                 RETURN(-EISDIR);
2005         }
2006
2007         RETURN(0);
2008 }
2009
2010 /**
2011  * The caller should guarantee to update the object ctime
2012  * after xattr_set if needed.
2013  */
2014 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
2015                          const struct lu_buf *buf, const char *name,
2016                          int fl)
2017 {
2018         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2019         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
2020         struct mdd_device *mdd = mdo2mdd(obj);
2021         struct thandle *handle;
2022         enum changelog_rec_type  cl_type;
2023         enum changelog_rec_flags clf_flags = 0;
2024         int rc;
2025         ENTRY;
2026
2027         rc = mdd_la_get(env, mdd_obj, attr);
2028         if (rc)
2029                 RETURN(rc);
2030
2031         rc = mdd_xattr_sanity_check(env, mdd_obj, attr, name);
2032         if (rc)
2033                 RETURN(rc);
2034
2035         if (strcmp(name, XATTR_LUSTRE_LOV) == 0 &&
2036             (fl == LU_XATTR_MERGE || fl == LU_XATTR_SPLIT)) {
2037                 struct md_rejig_data *mrd = buf->lb_buf;
2038                 struct md_object *victim = mrd->mrd_obj;
2039
2040                 if (buf->lb_len != sizeof(*mrd))
2041                         RETURN(-EINVAL);
2042
2043
2044                 if (fl == LU_XATTR_MERGE) {
2045                         rc = mdd_layout_merge_allowed(env, obj, victim);
2046                         if (rc)
2047                                 RETURN(rc);
2048                         /* merge layout of victim as a mirror of obj's. */
2049                         rc = mdd_xattr_merge(env, obj, victim);
2050                 } else {
2051                         rc = mdd_xattr_split(env, obj, mrd);
2052                 }
2053                 RETURN(rc);
2054         }
2055
2056         if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
2057             strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
2058                 struct posix_acl *acl;
2059
2060                 /* user may set empty ACL, which should be treated as removing
2061                  * ACL. */
2062                 acl = posix_acl_from_xattr(&init_user_ns, buf->lb_buf,
2063                                            buf->lb_len);
2064                 if (IS_ERR(acl))
2065                         RETURN(PTR_ERR(acl));
2066                 if (acl == NULL) {
2067                         rc = mdd_xattr_del(env, obj, name);
2068                         RETURN(rc);
2069                 }
2070                 posix_acl_release(acl);
2071         }
2072
2073         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
2074                 rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
2075                 RETURN(rc);
2076         }
2077
2078         handle = mdd_trans_create(env, mdd);
2079         if (IS_ERR(handle))
2080                 RETURN(PTR_ERR(handle));
2081
2082         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, fl, handle);
2083         if (rc)
2084                 GOTO(stop, rc);
2085
2086         rc = mdd_trans_start(env, mdd, handle);
2087         if (rc)
2088                 GOTO(stop, rc);
2089
2090         mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
2091
2092         if (strcmp(XATTR_NAME_HSM, name) == 0) {
2093                 rc = mdd_hsm_update_locked(env, obj, buf, handle, &clf_flags);
2094                 if (rc) {
2095                         mdd_write_unlock(env, mdd_obj);
2096                         GOTO(stop, rc);
2097                 }
2098         }
2099
2100         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle);
2101         mdd_write_unlock(env, mdd_obj);
2102         if (rc)
2103                 GOTO(stop, rc);
2104
2105         cl_type = mdd_xattr_changelog_type(env, mdd, name);
2106         if (cl_type < 0)
2107                 GOTO(stop, rc = 0);
2108
2109         rc = mdd_changelog_data_store_xattr(env, mdd, cl_type, clf_flags,
2110                                             mdd_obj, name, handle);
2111
2112         EXIT;
2113 stop:
2114         return mdd_trans_stop(env, mdd, rc, handle);
2115 }
2116
2117 static int mdd_declare_xattr_del(const struct lu_env *env,
2118                                  struct mdd_device *mdd,
2119                                  struct mdd_object *obj,
2120                                  const char *name,
2121                                  struct thandle *handle)
2122 {
2123         enum changelog_rec_type type;
2124         int rc;
2125
2126         rc = mdo_declare_xattr_del(env, obj, name, handle);
2127         if (rc)
2128                 return rc;
2129
2130         type = mdd_xattr_changelog_type(env, mdd, name);
2131         if (type < 0)
2132                 return 0; /* no changelog to store */
2133
2134         return mdd_declare_changelog_store(env, mdd, type, NULL, NULL, handle);
2135 }
2136
2137 /**
2138  * The caller should guarantee to update the object ctime
2139  * after xattr_set if needed.
2140  */
2141 static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
2142                          const char *name)
2143 {
2144         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2145         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
2146         struct mdd_device *mdd = mdo2mdd(obj);
2147         struct thandle *handle;
2148         int rc;
2149         ENTRY;
2150
2151         rc = mdd_la_get(env, mdd_obj, attr);
2152         if (rc)
2153                 RETURN(rc);
2154
2155         rc = mdd_xattr_sanity_check(env, mdd_obj, attr, name);
2156         if (rc)
2157                 RETURN(rc);
2158
2159         handle = mdd_trans_create(env, mdd);
2160         if (IS_ERR(handle))
2161                 RETURN(PTR_ERR(handle));
2162
2163         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
2164         if (rc)
2165                 GOTO(stop, rc);
2166
2167         rc = mdd_trans_start(env, mdd, handle);
2168         if (rc)
2169                 GOTO(stop, rc);
2170
2171         mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
2172         rc = mdo_xattr_del(env, mdd_obj, name, handle);
2173         mdd_write_unlock(env, mdd_obj);
2174         if (rc)
2175                 GOTO(stop, rc);
2176
2177         if (mdd_xattr_changelog_type(env, mdd, name) < 0)
2178                 GOTO(stop, rc = 0);
2179
2180         rc = mdd_changelog_data_store_xattr(env, mdd, CL_SETXATTR, 0, mdd_obj,
2181                                             name, handle);
2182
2183         EXIT;
2184 stop:
2185         return mdd_trans_stop(env, mdd, rc, handle);
2186 }
2187
2188 /*
2189  * read lov/lmv EA of an object
2190  * return the lov/lmv EA in an allocated lu_buf
2191  */
2192 int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
2193                    struct lu_buf *lmm_buf, const char *name)
2194 {
2195         struct lu_buf *buf = &mdd_env_info(env)->mdi_big_buf;
2196         int rc;
2197
2198         ENTRY;
2199
2200         if (buf->lb_buf == NULL) {
2201                 buf = lu_buf_check_and_alloc(buf, 4096);
2202                 if (buf->lb_buf == NULL)
2203                         RETURN(-ENOMEM);
2204         }
2205
2206 repeat:
2207         rc = mdo_xattr_get(env, obj, buf, name);
2208         if (rc == -ERANGE) {
2209                 /* mdi_big_buf is allocated but is too small
2210                  * we need to increase it */
2211                 buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mdi_big_buf,
2212                                              buf->lb_len * 2);
2213                 if (buf->lb_buf == NULL)
2214                         RETURN(-ENOMEM);
2215                 goto repeat;
2216         } else if (rc < 0) {
2217                 RETURN(rc);
2218         } else if (rc == 0) {
2219                 RETURN(-ENODATA);
2220         }
2221
2222         lu_buf_alloc(lmm_buf, rc);
2223         if (lmm_buf->lb_buf == NULL)
2224                 RETURN(-ENOMEM);
2225
2226         /*
2227          * we don't use lmm_buf directly, because we don't know xattr size, so
2228          * by using mdi_big_buf we can avoid calling mdo_xattr_get() twice.
2229          */
2230         memcpy(lmm_buf->lb_buf, buf->lb_buf, rc);
2231
2232         RETURN(0);
2233 }
2234
2235 static int mdd_xattr_hsm_replace(const struct lu_env *env,
2236                                  struct mdd_object *o, struct lu_buf *buf,
2237                                  struct thandle *handle)
2238 {
2239         struct hsm_attrs *attrs;
2240         enum hsm_states hsm_flags;
2241         enum changelog_rec_flags clf_flags = 0;
2242         int rc;
2243         ENTRY;
2244
2245         rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
2246                            handle);
2247         if (rc != 0)
2248                 RETURN(rc);
2249
2250         attrs = buf->lb_buf;
2251         hsm_flags = le32_to_cpu(attrs->hsm_flags);
2252         if (!(hsm_flags & HS_RELEASED) || mdd_is_dead_obj(o))
2253                 RETURN(0);
2254
2255         /* Add a changelog record for release. */
2256         hsm_set_cl_event(&clf_flags, HE_RELEASE);
2257         rc = mdd_changelog_data_store(env, mdo2mdd(&o->mod_obj), CL_HSM,
2258                                       clf_flags, o, handle, NULL);
2259         RETURN(rc);
2260 }
2261
2262 /*
2263  *  check if layout swapping between 2 objects is allowed
2264  *  the rules are:
2265  *  - only normal FIDs or non-system IGIFs
2266  *  - same type of objects
2267  *  - same owner/group (so quotas are still valid) unless this is from HSM
2268  *    release.
2269  */
2270 static int mdd_layout_swap_allowed(const struct lu_env *env,
2271                                    struct mdd_object *o1,
2272                                    const struct lu_attr *attr1,
2273                                    struct mdd_object *o2,
2274                                    const struct lu_attr *attr2,
2275                                    __u64 flags)
2276 {
2277         const struct lu_fid *fid1, *fid2;
2278         ENTRY;
2279
2280         fid1 = mdd_object_fid(o1);
2281         fid2 = mdd_object_fid(o2);
2282
2283         if (!fid_is_norm(fid1) &&
2284             (!fid_is_igif(fid1) || IS_ERR(mdd_links_get(env, o1))))
2285                 RETURN(-EBADF);
2286
2287         if (!fid_is_norm(fid2) &&
2288             (!fid_is_igif(fid2) || IS_ERR(mdd_links_get(env, o2))))
2289                 RETURN(-EBADF);
2290
2291         if (mdd_object_type(o1) != mdd_object_type(o2)) {
2292                 if (S_ISDIR(mdd_object_type(o1)))
2293                         RETURN(-ENOTDIR);
2294                 if (S_ISREG(mdd_object_type(o1)))
2295                         RETURN(-EISDIR);
2296                 RETURN(-EBADF);
2297         }
2298
2299         if (flags & SWAP_LAYOUTS_MDS_HSM)
2300                 RETURN(0);
2301
2302         if ((attr1->la_uid != attr2->la_uid) ||
2303             (attr1->la_gid != attr2->la_gid))
2304                 RETURN(-EPERM);
2305
2306         RETURN(0);
2307 }
2308
2309 /* XXX To set the proper lmm_oi & lmm_layout_gen when swap layouts, we have to
2310  *     look into the layout in MDD layer. */
2311 static int mdd_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi, bool get)
2312 {
2313         struct lov_comp_md_v1   *comp_v1;
2314         struct lov_mds_md       *v1;
2315         int                      i, ent_count;
2316         __u32                    off;
2317
2318         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
2319                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
2320                 ent_count = le16_to_cpu(comp_v1->lcm_entry_count);
2321
2322                 if (ent_count == 0)
2323                         return -EINVAL;
2324
2325                 if (get) {
2326                         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
2327                         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
2328                         *oi = v1->lmm_oi;
2329                 } else {
2330                         for (i = 0; i < le32_to_cpu(ent_count); i++) {
2331                                 off = le32_to_cpu(comp_v1->lcm_entries[i].
2332                                                 lcme_offset);
2333                                 v1 = (struct lov_mds_md *)((char *)comp_v1 +
2334                                                 off);
2335                                 v1->lmm_oi = *oi;
2336                         }
2337                 }
2338         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
2339                    le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
2340                 if (get)
2341                         *oi = lmm->lmm_oi;
2342                 else
2343                         lmm->lmm_oi = *oi;
2344         } else {
2345                 return -EINVAL;
2346         }
2347         return 0;
2348 }
2349
2350 static inline int mdd_get_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi)
2351 {
2352         return mdd_lmm_oi(lmm, oi, true);
2353 }
2354
2355 static inline int mdd_set_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi)
2356 {
2357         return mdd_lmm_oi(lmm, oi, false);
2358 }
2359
2360 static int mdd_lmm_gen(struct lov_mds_md *lmm, __u32 *gen, bool get)
2361 {
2362         struct lov_comp_md_v1 *comp_v1;
2363
2364         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
2365                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
2366                 if (get)
2367                         *gen = le32_to_cpu(comp_v1->lcm_layout_gen);
2368                 else
2369                         comp_v1->lcm_layout_gen = cpu_to_le32(*gen);
2370         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
2371                    le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
2372                 __u16 tmp_gen = *gen;
2373                 if (get)
2374                         *gen = le16_to_cpu(lmm->lmm_layout_gen);
2375                 else
2376                         lmm->lmm_layout_gen = cpu_to_le16(tmp_gen);
2377         } else {
2378                 return -EINVAL;
2379         }
2380         return 0;
2381 }
2382
2383 static inline int mdd_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
2384 {
2385         return mdd_lmm_gen(lmm, gen, true);
2386 }
2387
2388 static inline int mdd_set_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
2389 {
2390         return mdd_lmm_gen(lmm, gen, false);
2391 }
2392
2393 static int mdd_dom_data_truncate(const struct lu_env *env,
2394                                  struct mdd_device *mdd, struct mdd_object *mo)
2395 {
2396         struct thandle *th;
2397         struct dt_object *dom;
2398         int rc;
2399
2400         dom = dt_object_locate(mdd_object_child(mo), mdd->mdd_bottom);
2401         if (!dom)
2402                 GOTO(out, rc = -ENODATA);
2403
2404         th = dt_trans_create(env, mdd->mdd_bottom);
2405         if (IS_ERR(th))
2406                 GOTO(out, rc = PTR_ERR(th));
2407
2408         rc = dt_declare_punch(env, dom, 0, OBD_OBJECT_EOF, th);
2409         if (rc)
2410                 GOTO(stop, rc);
2411
2412         rc = dt_trans_start_local(env, mdd->mdd_bottom, th);
2413         if (rc != 0)
2414                 GOTO(stop, rc);
2415
2416         rc = dt_punch(env, dom, 0, OBD_OBJECT_EOF, th);
2417 stop:
2418         dt_trans_stop(env, mdd->mdd_bottom, th);
2419 out:
2420         /* Ignore failure but report the error */
2421         if (rc)
2422                 CERROR("%s: can't truncate DOM inode "DFID" data: rc = %d\n",
2423                        mdd_obj_dev_name(mo), PFID(mdd_object_fid(mo)), rc);
2424         return rc;
2425 }
2426
2427 /**
2428  * swap layouts between 2 lustre objects
2429  */
2430 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
2431                             struct md_object *obj2, __u64 flags)
2432 {
2433         struct mdd_thread_info *info = mdd_env_info(env);
2434         struct mdd_object *fst_o = md2mdd_obj(obj1);
2435         struct mdd_object *snd_o = md2mdd_obj(obj2);
2436         struct lu_attr *fst_la = MDD_ENV_VAR(env, cattr);
2437         struct lu_attr *snd_la = MDD_ENV_VAR(env, tattr);
2438         struct mdd_device *mdd = mdo2mdd(obj1);
2439         struct lov_mds_md *fst_lmm, *snd_lmm;
2440         struct lu_buf *fst_buf = &info->mdi_buf[0];
2441         struct lu_buf *snd_buf = &info->mdi_buf[1];
2442         struct lu_buf *fst_hsm_buf = &info->mdi_buf[2];
2443         struct lu_buf *snd_hsm_buf = &info->mdi_buf[3];
2444         struct ost_id *saved_oi = NULL;
2445         struct thandle *handle;
2446         struct mdd_object *dom_o = NULL;
2447         __u64 domsize_dom, domsize_vlt;
2448         __u32 fst_gen, snd_gen, saved_gen;
2449         int fst_fl;
2450         int rc, rc2;
2451
2452         ENTRY;
2453
2454         BUILD_BUG_ON(ARRAY_SIZE(info->mdi_buf) < 4);
2455         memset(info->mdi_buf, 0, sizeof(info->mdi_buf));
2456
2457         /* we have to sort the 2 obj, so locking will always
2458          * be in the same order, even in case of 2 concurrent swaps */
2459         rc = lu_fid_cmp(mdd_object_fid(fst_o), mdd_object_fid(snd_o));
2460         if (rc == 0) /* same fid ? */
2461                 RETURN(-EPERM);
2462
2463         if (rc < 0)
2464                 swap(fst_o, snd_o);
2465
2466         rc = mdd_la_get(env, fst_o, fst_la);
2467         if (rc != 0)
2468                 RETURN(rc);
2469
2470         rc = mdd_la_get(env, snd_o, snd_la);
2471         if (rc != 0)
2472                 RETURN(rc);
2473
2474         /* check if layout swapping is allowed */
2475         rc = mdd_layout_swap_allowed(env, fst_o, fst_la, snd_o, snd_la, flags);
2476         if (rc != 0)
2477                 RETURN(rc);
2478
2479         handle = mdd_trans_create(env, mdd);
2480         if (IS_ERR(handle))
2481                 RETURN(PTR_ERR(handle));
2482
2483         /* objects are already sorted */
2484         mdd_write_lock(env, fst_o, DT_TGT_CHILD);
2485         mdd_write_lock(env, snd_o, DT_TGT_CHILD);
2486
2487         rc = mdd_stripe_get(env, fst_o, fst_buf, XATTR_NAME_LOV);
2488         if (rc < 0 && rc != -ENODATA)
2489                 GOTO(stop, rc);
2490
2491         rc = mdd_stripe_get(env, snd_o, snd_buf, XATTR_NAME_LOV);
2492         if (rc < 0 && rc != -ENODATA)
2493                 GOTO(stop, rc);
2494
2495         /* check if file has DoM. DoM file can be migrated only to another
2496          * DoM layout with the same DoM component size or to an non-DOM
2497          * layout. After migration to OSTs layout, local MDT inode data
2498          * should be truncated.
2499          * Objects are sorted by FIDs, considering that original file's FID
2500          * is always smaller the snd_o is always original file we are migrating
2501          * from.
2502          */
2503         domsize_dom = mdd_lmm_dom_size(snd_buf->lb_buf);
2504         domsize_vlt = mdd_lmm_dom_size(fst_buf->lb_buf);
2505
2506         /* Only migration is supported for DoM files, not 'swap_layouts' so
2507          * target file must be volatile and orphan.
2508          */
2509         if (fst_o->mod_flags & (ORPHAN_OBJ | VOLATILE_OBJ)) {
2510                 dom_o = domsize_dom ? snd_o : NULL;
2511         } else if (snd_o->mod_flags & (ORPHAN_OBJ | VOLATILE_OBJ)) {
2512                 swap(domsize_dom, domsize_vlt);
2513                 dom_o = domsize_dom ? fst_o : NULL;
2514         } else if (domsize_dom > 0 || domsize_vlt > 0) {
2515                 /* 'lfs swap_layouts' case, neither file should have DoM */
2516                 rc = -EOPNOTSUPP;
2517                 CDEBUG(D_LAYOUT, "cannot swap layouts with DOM component, "
2518                        "use migration instead: rc = %d\n", rc);
2519                 GOTO(stop, rc);
2520         }
2521
2522         if (domsize_vlt > 0 && domsize_dom == 0) {
2523                 rc = -EOPNOTSUPP;
2524                 CDEBUG(D_LAYOUT,
2525                        "%s: cannot swap "DFID" layout: OST to DOM migration not supported: rc = %d\n",
2526                        mdd_obj_dev_name(snd_o),
2527                        PFID(mdd_object_fid(snd_o)), rc);
2528                 GOTO(stop, rc);
2529         } else if (domsize_vlt > 0 && domsize_dom != domsize_vlt) {
2530                 rc = -EOPNOTSUPP;
2531                 CDEBUG(D_LAYOUT,
2532                        "%s: cannot swap "DFID" layout: new layout must have same DoM component size: rc = %d\n",
2533                        mdd_obj_dev_name(fst_o),
2534                        PFID(mdd_object_fid(fst_o)), rc);
2535                 GOTO(stop, rc);
2536         } else if (domsize_vlt > 0) {
2537                 /* Migration with the same DOM component size, no need to
2538                  * truncate local data, it is still being used */
2539                 dom_o = NULL;
2540         }
2541
2542         /* swapping 2 non existant layouts is a success */
2543         if (fst_buf->lb_buf == NULL && snd_buf->lb_buf == NULL)
2544                 GOTO(stop, rc = 0);
2545
2546         /* to help inode migration between MDT, it is better to
2547          * start by the no layout file (if one), so we order the swap */
2548         if (snd_buf->lb_buf == NULL) {
2549                 swap(fst_o, snd_o);
2550                 swap(fst_buf, snd_buf);
2551         }
2552
2553         fst_gen = snd_gen = 0;
2554         /* lmm and generation layout initialization */
2555         if (fst_buf->lb_buf != NULL) {
2556                 fst_lmm = fst_buf->lb_buf;
2557                 mdd_get_lmm_gen(fst_lmm, &fst_gen);
2558                 fst_fl  = LU_XATTR_REPLACE;
2559         } else {
2560                 fst_lmm = NULL;
2561                 fst_gen = 0;
2562                 fst_fl  = LU_XATTR_CREATE;
2563         }
2564
2565         snd_lmm = snd_buf->lb_buf;
2566         mdd_get_lmm_gen(snd_lmm, &snd_gen);
2567
2568         saved_gen = fst_gen;
2569         /* increase the generation layout numbers */
2570         snd_gen++;
2571         fst_gen++;
2572
2573         /*
2574          * XXX The layout generation is used to generate component IDs for
2575          *     the composite file, we have to do some special tweaks to make
2576          *     sure the layout generation is always adequate for that job.
2577          */
2578
2579         /* Skip invalid generation number for composite layout */
2580         if ((snd_gen & LCME_ID_MASK) == 0)
2581                 snd_gen++;
2582         if ((fst_gen & LCME_ID_MASK) == 0)
2583                 fst_gen++;
2584         /* Make sure the generation is greater than all the component IDs */
2585         if (fst_gen < snd_gen)
2586                 fst_gen = snd_gen;
2587         else if (fst_gen > snd_gen)
2588                 snd_gen = fst_gen;
2589
2590         /* set the file specific informations in lmm */
2591         if (fst_lmm != NULL) {
2592                 struct ost_id temp_oi;
2593
2594                 saved_oi = &info->mdi_oa.o_oi;
2595                 mdd_get_lmm_oi(fst_lmm, saved_oi);
2596                 mdd_get_lmm_oi(snd_lmm, &temp_oi);
2597                 mdd_set_lmm_gen(fst_lmm, &snd_gen);
2598                 mdd_set_lmm_oi(fst_lmm, &temp_oi);
2599                 mdd_set_lmm_oi(snd_lmm, saved_oi);
2600         } else {
2601                 if ((snd_lmm->lmm_magic & cpu_to_le32(LOV_MAGIC_MASK)) ==
2602                     cpu_to_le32(LOV_MAGIC_MAGIC))
2603                         snd_lmm->lmm_magic |= cpu_to_le32(LOV_MAGIC_DEFINED);
2604                 else
2605                         GOTO(stop, rc = -EPROTO);
2606         }
2607         mdd_set_lmm_gen(snd_lmm, &fst_gen);
2608
2609         /* Prepare HSM attribute if it's required */
2610         if (flags & SWAP_LAYOUTS_MDS_HSM) {
2611                 const int buflen = sizeof(struct hsm_attrs);
2612
2613                 lu_buf_alloc(fst_hsm_buf, buflen);
2614                 lu_buf_alloc(snd_hsm_buf, buflen);
2615                 if (fst_hsm_buf->lb_buf == NULL || snd_hsm_buf->lb_buf == NULL)
2616                         GOTO(stop, rc = -ENOMEM);
2617
2618                 /* Read HSM attribute */
2619                 rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM);
2620                 if (rc < 0)
2621                         GOTO(stop, rc);
2622
2623                 rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM);
2624                 if (rc < 0)
2625                         GOTO(stop, rc);
2626
2627                 rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_hsm_buf,
2628                                            XATTR_NAME_HSM, LU_XATTR_REPLACE,
2629                                            handle);
2630                 if (rc < 0)
2631                         GOTO(stop, rc);
2632
2633                 rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_hsm_buf,
2634                                            XATTR_NAME_HSM, LU_XATTR_REPLACE,
2635                                            handle);
2636                 if (rc < 0)
2637                         GOTO(stop, rc);
2638         }
2639
2640         /* prepare transaction */
2641         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
2642                                    fst_fl, handle);
2643         if (rc != 0)
2644                 GOTO(stop, rc);
2645
2646         if (fst_buf->lb_buf != NULL)
2647                 rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf,
2648                                            XATTR_NAME_LOV, LU_XATTR_REPLACE,
2649                                            handle);
2650         else
2651                 rc = mdd_declare_xattr_del(env, mdd, snd_o, XATTR_NAME_LOV,
2652                                            handle);
2653         if (rc != 0)
2654                 GOTO(stop, rc);
2655
2656         rc = mdd_trans_start(env, mdd, handle);
2657         if (rc != 0)
2658                 GOTO(stop, rc);
2659
2660         if (flags & SWAP_LAYOUTS_MDS_HSM) {
2661                 rc = mdd_xattr_hsm_replace(env, fst_o, snd_hsm_buf, handle);
2662                 if (rc < 0)
2663                         GOTO(stop, rc);
2664
2665                 rc = mdd_xattr_hsm_replace(env, snd_o, fst_hsm_buf, handle);
2666                 if (rc < 0) {
2667                         rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
2668                                                     handle);
2669                         if (rc2 < 0)
2670                                 CERROR("%s: HSM error restoring "DFID": rc = %d/%d\n",
2671                                        mdd_obj_dev_name(fst_o),
2672                                        PFID(mdd_object_fid(fst_o)), rc, rc2);
2673                         GOTO(stop, rc);
2674                 }
2675         }
2676
2677         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle);
2678         if (rc != 0)
2679                 GOTO(stop, rc);
2680
2681         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_MDS_HSM_SWAP_LAYOUTS))) {
2682                 rc = -EOPNOTSUPP;
2683         } else {
2684                 if (fst_buf->lb_buf != NULL)
2685                         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
2686                                            LU_XATTR_REPLACE, handle);
2687                 else
2688                         rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle);
2689         }
2690         if (rc != 0)
2691                 GOTO(out_restore, rc);
2692
2693         /* Issue one changelog record per file */
2694         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, fst_o, handle,
2695                                       NULL);
2696         if (rc)
2697                 GOTO(stop, rc);
2698
2699         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, snd_o, handle,
2700                                       NULL);
2701         if (rc)
2702                 GOTO(stop, rc);
2703         EXIT;
2704
2705 out_restore:
2706         if (rc != 0) {
2707                 int steps = 0;
2708
2709                 /* failure on second file, but first was done, so we have
2710                  * to roll back first. */
2711                 if (fst_buf->lb_buf != NULL) {
2712                         mdd_set_lmm_oi(fst_lmm, saved_oi);
2713                         mdd_set_lmm_gen(fst_lmm, &saved_gen);
2714                         rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
2715                                             LU_XATTR_REPLACE, handle);
2716                 } else {
2717                         rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle);
2718                 }
2719                 if (rc2 < 0)
2720                         goto do_lbug;
2721
2722                 if (flags & SWAP_LAYOUTS_MDS_HSM) {
2723                         ++steps;
2724                         rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
2725                                                     handle);
2726                         if (rc2 < 0)
2727                                 goto do_lbug;
2728
2729                         ++steps;
2730                         rc2 = mdd_xattr_hsm_replace(env, snd_o, snd_hsm_buf,
2731                                                     handle);
2732                 }
2733
2734         do_lbug:
2735                 if (rc2 < 0) {
2736                         /* very bad day */
2737                         CERROR("%s: unable to roll back layout swap of "DFID" and "DFID", steps: %d: rc = %d/%d\n",
2738                                mdd_obj_dev_name(fst_o),
2739                                PFID(mdd_object_fid(snd_o)),
2740                                PFID(mdd_object_fid(fst_o)),
2741                                rc, rc2, steps);
2742                         /* a solution to avoid journal commit is to panic,
2743                          * but it has strong consequences so we use LBUG to
2744                          * allow sysdamin to choose to panic or not
2745                          */
2746                         LBUG();
2747                 }
2748         }
2749
2750 stop:
2751         rc = mdd_trans_stop(env, mdd, rc, handle);
2752
2753         /* Truncate local DOM data if all went well */
2754         if (!rc && dom_o)
2755                 mdd_dom_data_truncate(env, mdd, dom_o);
2756
2757         mdd_write_unlock(env, snd_o);
2758         mdd_write_unlock(env, fst_o);
2759
2760         lu_buf_free(fst_buf);
2761         lu_buf_free(snd_buf);
2762         lu_buf_free(fst_hsm_buf);
2763         lu_buf_free(snd_hsm_buf);
2764
2765         if (!rc) {
2766                 (void) mdd_object_pfid_replace(env, fst_o);
2767                 (void) mdd_object_pfid_replace(env, snd_o);
2768         }
2769         return rc;
2770 }
2771
2772 static int mdd_declare_layout_change(const struct lu_env *env,
2773                                      struct mdd_device *mdd,
2774                                      struct mdd_object *obj,
2775                                      struct md_layout_change *mlc,
2776                                      struct thandle *handle)
2777 {
2778         int rc;
2779
2780         rc = mdo_declare_layout_change(env, obj, mlc, handle);
2781         if (rc)
2782                 return rc;
2783
2784         return mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
2785                                            handle);
2786 }
2787
2788 /* For PFL, this is used to instantiate necessary component objects. */
2789 static int
2790 mdd_layout_instantiate_component(const struct lu_env *env,
2791                 struct mdd_object *obj, struct md_layout_change *mlc,
2792                 struct thandle *handle)
2793 {
2794         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2795         int rc;
2796         ENTRY;
2797
2798         if (mlc->mlc_opc != MD_LAYOUT_WRITE)
2799                 RETURN(-ENOTSUPP);
2800
2801         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2802         /**
2803          * It's possible that another layout write intent has already
2804          * instantiated our objects, so a -EALREADY returned, and we need to
2805          * do nothing.
2806          */
2807         if (rc)
2808                 RETURN(rc == -EALREADY ? 0 : rc);
2809
2810         rc = mdd_trans_start(env, mdd, handle);
2811         if (rc)
2812                 RETURN(rc);
2813
2814         mdd_write_lock(env, obj, DT_TGT_CHILD);
2815         rc = mdo_layout_change(env, obj, mlc, handle);
2816         mdd_write_unlock(env, obj);
2817         if (rc)
2818                 RETURN(rc);
2819
2820         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
2821                                       NULL);
2822         RETURN(rc);
2823 }
2824
2825 /**
2826  * Change the FLR layout from RDONLY to WRITE_PENDING.
2827  *
2828  * It picks the primary mirror, and bumps the layout version, and set
2829  * layout version xattr to OST objects in a sync tx. In order to facilitate
2830  * the handling of phantom writers from evicted clients, the clients carry
2831  * layout version of the file with write RPC, so that the OSTs can verify
2832  * if the write RPCs are legitimate, meaning not from evicted clients.
2833  */
2834 static int
2835 mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
2836                          struct md_layout_change *mlc, struct thandle *handle)
2837 {
2838         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2839         struct lu_buf *som_buf = &mdd_env_info(env)->mdi_buf[1];
2840         struct lustre_som_attrs *som = &mlc->mlc_som;
2841         int fl = 0;
2842         int rc;
2843         ENTRY;
2844
2845         /* Verify acceptable operations */
2846         switch (mlc->mlc_opc) {
2847         case MD_LAYOUT_WRITE:
2848         case MD_LAYOUT_RESYNC:
2849                 /* these are legal operations - this represents the case that
2850                  * a few mirrors were missed in the last resync. */
2851                 break;
2852         case MD_LAYOUT_RESYNC_DONE:
2853         default:
2854                 RETURN(0);
2855         }
2856
2857         som_buf->lb_buf = som;
2858         som_buf->lb_len = sizeof(*som);
2859         rc = mdo_xattr_get(env, obj, som_buf, XATTR_NAME_SOM);
2860         if (rc < 0 && rc != -ENODATA)
2861                 RETURN(rc);
2862
2863         if (rc > 0) {
2864                 lustre_som_swab(som);
2865                 if (som->lsa_valid & SOM_FL_STRICT)
2866                         fl = LU_XATTR_REPLACE;
2867
2868                 if (mlc->mlc_opc == MD_LAYOUT_WRITE &&
2869                     mlc->mlc_intent->li_extent.e_end > som->lsa_size) {
2870                         som->lsa_size = mlc->mlc_intent->li_extent.e_end + 1;
2871                         fl = LU_XATTR_REPLACE;
2872                 }
2873         }
2874
2875         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2876         if (rc)
2877                 GOTO(out, rc);
2878
2879         if (fl) {
2880                 rc = mdd_declare_xattr_set(env, mdd, obj, som_buf,
2881                                            XATTR_NAME_SOM, fl, handle);
2882                 if (rc)
2883                         GOTO(out, rc);
2884         }
2885
2886         /* record a changelog for data mover to consume */
2887         rc = mdd_declare_changelog_store(env, mdd, CL_FLRW, NULL, NULL, handle);
2888         if (rc)
2889                 GOTO(out, rc);
2890
2891         rc = mdd_trans_start(env, mdd, handle);
2892         if (rc)
2893                 GOTO(out, rc);
2894
2895         /* it needs a sync tx to make FLR to work properly */
2896         handle->th_sync = 1;
2897
2898         mdd_write_lock(env, obj, DT_TGT_CHILD);
2899         rc = mdo_layout_change(env, obj, mlc, handle);
2900         if (!rc && fl) {
2901                 /* SOM state transition from STRICT to STALE */
2902                 som->lsa_valid = SOM_FL_STALE;
2903                 lustre_som_swab(som);
2904                 rc = mdo_xattr_set(env, obj, som_buf, XATTR_NAME_SOM,
2905                                    fl, handle);
2906         }
2907         mdd_write_unlock(env, obj);
2908         if (rc)
2909                 GOTO(out, rc);
2910
2911         rc = mdd_changelog_data_store(env, mdd, CL_FLRW, 0, obj, handle, NULL);
2912         if (rc)
2913                 GOTO(out, rc);
2914
2915         EXIT;
2916
2917 out:
2918         return rc;
2919 }
2920
2921 /**
2922  * Handle mirrored file state transition when it's in WRITE_PENDING.
2923  *
2924  * Only MD_LAYOUT_RESYNC, which represents start of resync, is allowed when
2925  * the file is in WRITE_PENDING state. If everything goes fine, the file's
2926  * layout version will be increased, and the file's state will be changed to
2927  * SYNC_PENDING.
2928  */
2929 static int
2930 mdd_layout_update_write_pending(const struct lu_env *env,
2931                 struct mdd_object *obj, struct md_layout_change *mlc,
2932                 struct thandle *handle)
2933 {
2934         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2935         struct lu_buf *som_buf = &mdd_env_info(env)->mdi_buf[1];
2936         struct lustre_som_attrs *som = &mlc->mlc_som;
2937         int fl = 0;
2938         int rc;
2939         ENTRY;
2940
2941         switch (mlc->mlc_opc) {
2942         case MD_LAYOUT_RESYNC:
2943                 /* Upon receiving the resync request, it should
2944                  * instantiate all stale components right away to get ready
2945                  * for mirror copy. In order to avoid layout version change,
2946                  * client should avoid sending LAYOUT_WRITE request at the
2947                  * resync state. */
2948                 break;
2949         case MD_LAYOUT_WRITE:
2950                 /**
2951                  * legal race for concurrent write, the file state has been
2952                  * changed by another client. Or a jump over file size and
2953                  * write.
2954                  */
2955                 som_buf->lb_buf = som;
2956                 som_buf->lb_len = sizeof(*som);
2957                 rc = mdo_xattr_get(env, obj, som_buf, XATTR_NAME_SOM);
2958                 if (rc < 0 && rc != -ENODATA)
2959                         RETURN(rc);
2960
2961                 if (rc > 0) {
2962                         lustre_som_swab(som);
2963                         if (mlc->mlc_intent->li_extent.e_end > som->lsa_size) {
2964                                 som->lsa_size =
2965                                         mlc->mlc_intent->li_extent.e_end + 1;
2966                                 fl = LU_XATTR_REPLACE;
2967                         }
2968                 }
2969                 break;
2970         default:
2971                 RETURN(-EBUSY);
2972         }
2973
2974         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2975         if (rc)
2976                 GOTO(out, rc);
2977
2978         if (fl) {
2979                 rc = mdd_declare_xattr_set(env, mdd, obj, som_buf,
2980                                            XATTR_NAME_SOM, fl, handle);
2981                 if (rc)
2982                         GOTO(out, rc);
2983         }
2984
2985         rc = mdd_trans_start(env, mdd, handle);
2986         if (rc)
2987                 GOTO(out, rc);
2988
2989         /* it needs a sync tx to make FLR to work properly */
2990         handle->th_sync = 1;
2991
2992         mdd_write_lock(env, obj, DT_TGT_CHILD);
2993         rc = mdo_layout_change(env, obj, mlc, handle);
2994         if (!rc && fl) {
2995                 som->lsa_valid = SOM_FL_STALE;
2996                 lustre_som_swab(som);
2997                 rc = mdo_xattr_set(env, obj, som_buf, XATTR_NAME_SOM,
2998                                    fl, handle);
2999         }
3000         mdd_write_unlock(env, obj);
3001         if (rc)
3002                 GOTO(out, rc);
3003
3004         EXIT;
3005
3006 out:
3007         return rc;
3008 }
3009
3010 /**
3011  * Handle the requests when a FLR file's state is in SYNC_PENDING.
3012  *
3013  * Only concurrent write and sync complete requests are possible when the
3014  * file is in SYNC_PENDING. For the latter request, it will pass in the
3015  * mirrors that have been synchronized, then the stale bit will be cleared
3016  * to make the file's state turn into RDONLY.
3017  * For concurrent write reqeust, it just needs to change the file's state
3018  * to WRITE_PENDING in a sync tx. It doesn't have to change the layout
3019  * version because the version will be increased in the transition to
3020  * SYNC_PENDING later so that it can deny the write request from potential
3021  * evicted SYNC clients. */
3022 static int
3023 mdd_object_update_sync_pending(const struct lu_env *env, struct mdd_object *obj,
3024                 struct md_layout_change *mlc, struct thandle *handle)
3025 {
3026         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
3027         struct lu_buf *som_buf = &mdd_env_info(env)->mdi_buf[1];
3028         int fl = 0;
3029         int rc;
3030         ENTRY;
3031
3032         /* operation validation */
3033         switch (mlc->mlc_opc) {
3034         case MD_LAYOUT_RESYNC_DONE:
3035                 /* resync complete. */
3036         case MD_LAYOUT_WRITE:
3037                 /* concurrent write. */
3038                 break;
3039         case MD_LAYOUT_RESYNC:
3040                 /* resync again, most likely the previous run failed.
3041                  * no-op if it's already in SYNC_PENDING state */
3042                 RETURN(0);
3043         default:
3044                 RETURN(-EBUSY);
3045         }
3046
3047         if (mlc->mlc_som.lsa_valid & SOM_FL_STRICT) {
3048                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_SOM);
3049                 if (rc < 0 && rc != -ENODATA)
3050                         RETURN(rc);
3051
3052                 fl = rc == -ENODATA ? LU_XATTR_CREATE : LU_XATTR_REPLACE;
3053                 lustre_som_swab(&mlc->mlc_som);
3054                 som_buf->lb_buf = &mlc->mlc_som;
3055                 som_buf->lb_len = sizeof(mlc->mlc_som);
3056         }
3057
3058         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
3059         if (rc)
3060                 GOTO(out, rc);
3061
3062         /* record a changelog for the completion of resync */
3063         rc = mdd_declare_changelog_store(env, mdd, CL_RESYNC, NULL, NULL,
3064                                          handle);
3065         if (rc)
3066                 GOTO(out, rc);
3067
3068         /* RESYNC_DONE has piggybacked size and blocks */
3069         if (fl) {
3070                 rc = mdd_declare_xattr_set(env, mdd, obj, som_buf,
3071                                            XATTR_NAME_SOM, fl, handle);
3072                 if (rc)
3073                         GOTO(out, rc);
3074         }
3075
3076         rc = mdd_trans_start(env, mdd, handle);
3077         if (rc)
3078                 GOTO(out, rc);
3079
3080         /* it needs a sync tx to make FLR to work properly */
3081         handle->th_sync = 1;
3082
3083         rc = mdo_layout_change(env, obj, mlc, handle);
3084         if (rc)
3085                 GOTO(out, rc);
3086
3087         if (fl) {
3088                 rc = mdo_xattr_set(env, obj, som_buf, XATTR_NAME_SOM,
3089                                    fl, handle);
3090                 if (rc)
3091                         GOTO(out, rc);
3092         }
3093
3094         rc = mdd_changelog_data_store(env, mdd, CL_RESYNC, 0, obj, handle,
3095                                       NULL);
3096         if (rc)
3097                 GOTO(out, rc);
3098         EXIT;
3099 out:
3100         return rc;
3101 }
3102
3103 /**
3104  * Layout change callback for object.
3105  *
3106  * This is only used by FLR for now. In the future, it can be exteneded to
3107  * handle all layout change.
3108  */
3109 static int
3110 mdd_layout_change(const struct lu_env *env, struct md_object *o,
3111                   struct md_layout_change *mlc)
3112 {
3113         struct mdd_object       *obj = md2mdd_obj(o);
3114         struct mdd_device       *mdd = mdd_obj2mdd_dev(obj);
3115         struct lu_buf           *buf = mdd_buf_get(env, NULL, 0);
3116         struct lov_comp_md_v1   *lcm;
3117         struct thandle          *handle;
3118         int flr_state;
3119         int rc;
3120
3121         ENTRY;
3122
3123         if (S_ISDIR(mdd_object_type(obj))) {
3124                 switch (mlc->mlc_opc) {
3125                 case MD_LAYOUT_SHRINK:
3126                         rc = mdd_dir_layout_shrink(env, o, mlc);
3127                         break;
3128                 case MD_LAYOUT_SPLIT:
3129                         rc = mdd_dir_layout_split(env, o, mlc);
3130                         break;
3131                 default:
3132                         LBUG();
3133                 }
3134
3135                 RETURN(rc);
3136         }
3137
3138         /* Verify acceptable operations */
3139         switch (mlc->mlc_opc) {
3140         case MD_LAYOUT_WRITE:
3141         case MD_LAYOUT_RESYNC:
3142         case MD_LAYOUT_RESYNC_DONE:
3143                 break;
3144         default:
3145                 RETURN(-ENOTSUPP);
3146         }
3147
3148         handle = mdd_trans_create(env, mdd);
3149         if (IS_ERR(handle))
3150                 RETURN(PTR_ERR(handle));
3151
3152         rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
3153         if (rc < 0) {
3154                 if (rc == -ENODATA)
3155                         rc = -EINVAL;
3156                 GOTO(out, rc);
3157         }
3158
3159         /* analyze the layout to make sure it's a FLR file */
3160         lcm = buf->lb_buf;
3161         if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
3162                 GOTO(out, rc = -EINVAL);
3163
3164         flr_state = le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK;
3165
3166         /* please refer to HLD of FLR for state transition */
3167         switch (flr_state) {
3168         case LCM_FL_NONE:
3169                 rc = mdd_layout_instantiate_component(env, obj, mlc, handle);
3170                 break;
3171         case LCM_FL_WRITE_PENDING:
3172                 rc = mdd_layout_update_write_pending(env, obj, mlc, handle);
3173                 break;
3174         case LCM_FL_RDONLY:
3175                 rc = mdd_layout_update_rdonly(env, obj, mlc, handle);
3176                 break;
3177         case LCM_FL_SYNC_PENDING:
3178                 rc = mdd_object_update_sync_pending(env, obj, mlc, handle);
3179                 break;
3180         default:
3181                 rc = 0;
3182                 break;
3183         }
3184         EXIT;
3185
3186 out:
3187         mdd_trans_stop(env, mdd, rc, handle);
3188         lu_buf_free(buf);
3189         return rc;
3190 }
3191
3192 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
3193                           struct mdd_object *child, const struct lu_attr *attr,
3194                           const struct md_op_spec *spec,
3195                           struct dt_allocation_hint *hint)
3196 {
3197         struct dt_object *np = parent ?  mdd_object_child(parent) : NULL;
3198         struct mdd_device *mdd = mdd_obj2mdd_dev(child);
3199         struct dt_object *nc = mdd_object_child(child);
3200
3201         memset(hint, 0, sizeof(*hint));
3202
3203         /* For striped directory, give striping EA to lod_ah_init, which will
3204          * decide the stripe_offset and stripe count by it. */
3205         if (S_ISDIR(attr->la_mode) &&
3206             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
3207                 hint->dah_eadata = spec->u.sp_ea.eadata;
3208                 hint->dah_eadata_len = spec->u.sp_ea.eadatalen;
3209         } else if (S_ISREG(attr->la_mode) &&
3210                    spec->sp_cr_flags & MDS_OPEN_APPEND) {
3211                 hint->dah_append_stripe_count = mdd->mdd_append_stripe_count;
3212                 hint->dah_append_pool = mdd->mdd_append_pool;
3213         }
3214
3215         CDEBUG(D_INFO, DFID" eadata %p len %d\n", PFID(mdd_object_fid(child)),
3216                hint->dah_eadata, hint->dah_eadata_len);
3217         /* @hint will be initialized by underlying device. */
3218         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
3219 }
3220
3221 static int mdd_accmode(const struct lu_env *env, const struct lu_attr *la,
3222                        u64 open_flags)
3223 {
3224         /* Sadly, NFSD reopens a file repeatedly during operation, so the
3225          * "acc_mode = 0" allowance for newly-created files isn't honoured.
3226          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
3227          * owner can write to a file even if it is marked readonly to hide
3228          * its brokenness. (bug 5781) */
3229         if (open_flags & MDS_OPEN_OWNEROVERRIDE) {
3230                 struct lu_ucred *uc = lu_ucred_check(env);
3231
3232                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
3233                         return 0;
3234         }
3235
3236         return mds_accmode(open_flags);
3237 }
3238
3239 static int mdd_open_sanity_check(const struct lu_env *env,
3240                                  struct mdd_object *obj,
3241                                  const struct lu_attr *attr, u64 open_flags,
3242                                  int is_replay)
3243 {
3244         unsigned int may_mask;
3245         int rc;
3246         ENTRY;
3247
3248         /* EEXIST check, also opening of *open* orphans is allowed so we can
3249          * open-by-handle unlinked files
3250          */
3251         if (mdd_is_dead_obj(obj) && !is_replay &&
3252             likely(!(mdd_is_orphan_obj(obj) && obj->mod_count > 0)))
3253                 RETURN(-ENOENT);
3254
3255         if (S_ISLNK(attr->la_mode))
3256                 RETURN(-ELOOP);
3257
3258         may_mask = mdd_accmode(env, attr, open_flags);
3259
3260         if (S_ISDIR(attr->la_mode) && (may_mask & MAY_WRITE))
3261                 RETURN(-EISDIR);
3262
3263         if (!(open_flags & MDS_OPEN_CREATED)) {
3264                 rc = mdd_permission_internal(env, obj, attr, may_mask);
3265                 if (rc)
3266                         RETURN(rc);
3267         }
3268
3269         if (S_ISFIFO(attr->la_mode) || S_ISSOCK(attr->la_mode) ||
3270             S_ISBLK(attr->la_mode) || S_ISCHR(attr->la_mode))
3271                 open_flags &= ~MDS_OPEN_TRUNC;
3272
3273         /* For writing append-only file must open it with append mode. */
3274         if (attr->la_flags & LUSTRE_APPEND_FL) {
3275                 if ((open_flags & MDS_FMODE_WRITE) &&
3276                     !(open_flags & MDS_OPEN_APPEND))
3277                         RETURN(-EPERM);
3278                 if (open_flags & MDS_OPEN_TRUNC)
3279                         RETURN(-EPERM);
3280         }
3281
3282         RETURN(0);
3283 }
3284
3285 static int mdd_open(const struct lu_env *env, struct md_object *obj,
3286                     u64 open_flags, struct md_op_spec *spec)
3287 {
3288         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3289         struct md_device *md_dev = lu2md_dev(mdd2lu_dev(mdo2mdd(obj)));
3290         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
3291         struct mdd_object_user *mou = NULL;
3292         const struct lu_ucred *uc = lu_ucred(env);
3293         struct mdd_device *mdd = mdo2mdd(obj);
3294         enum changelog_rec_type type = CL_OPEN;
3295         int rc = 0;
3296         ENTRY;
3297
3298         mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
3299
3300         rc = mdd_la_get(env, mdd_obj, attr);
3301         if (rc != 0)
3302                 GOTO(out, rc);
3303
3304         rc = mdd_open_sanity_check(env, mdd_obj, attr, open_flags,
3305                                    spec->no_create);
3306         if ((rc == -EACCES) && (mdd->mdd_cl.mc_current_mask & BIT(CL_DN_OPEN)))
3307                 type = CL_DN_OPEN;
3308         else if (rc != 0)
3309                 GOTO(out, rc);
3310         else
3311                 mdd_obj->mod_count++;
3312
3313         if (!mdd_changelog_enabled(env, mdd, type))
3314                 GOTO(out, rc);
3315
3316 find:
3317         /* look for existing opener in list under mdd_write_lock */
3318         mou = mdd_obj_user_find(mdd_obj, uc->uc_uid, uc->uc_gid, open_flags);
3319
3320         if (!mou) {
3321                 int rc2;
3322
3323                 /* add user to list */
3324                 mou = mdd_obj_user_alloc(open_flags, uc->uc_uid, uc->uc_gid);
3325                 if (IS_ERR(mou)) {
3326                         if (rc == 0)
3327                                 rc = PTR_ERR(mou);
3328                         GOTO(out, rc);
3329                 }
3330                 rc2 = mdd_obj_user_add(mdd_obj, mou, type == CL_DN_OPEN);
3331                 if (rc2 != 0) {
3332                         mdd_obj_user_free(mou);
3333                         if (rc2 == -EEXIST)
3334                                 GOTO(find, rc2);
3335                 }
3336         } else {
3337                 if (type == CL_DN_OPEN) {
3338                         if (ktime_before(ktime_get(), mou->mou_deniednext))
3339                                 /* same user denied again same access within
3340                                  * time interval: do not record
3341                                  */
3342                                 GOTO(out, rc);
3343
3344                         /* this user already denied, but some time ago:
3345                          * update denied time
3346                          */
3347                         mou->mou_deniednext =
3348                                 ktime_add(ktime_get(),
3349                                           ktime_set(mdd->mdd_cl.mc_deniednext,
3350                                                     0));
3351                 } else {
3352                         mou->mou_opencount++;
3353                         /* same user opening file again with same flags:
3354                          * don't record
3355                          */
3356                         GOTO(out, rc);
3357                 }
3358         }
3359
3360         /* FYI, only the bottom 32 bits of open_flags are recorded */
3361         mdd_changelog(env, type, open_flags, md_dev, mdd_object_fid(mdd_obj));
3362
3363         EXIT;
3364 out:
3365         mdd_write_unlock(env, mdd_obj);
3366         return rc;
3367 }
3368
3369 static int mdd_declare_close(const struct lu_env *env, struct mdd_object *obj,
3370                              struct md_attr *ma, struct thandle *handle)
3371 {
3372         int rc;
3373
3374         rc = mdd_orphan_declare_delete(env, obj, handle);
3375         if (rc)
3376                 return rc;
3377
3378         return mdo_declare_destroy(env, obj, handle);
3379 }
3380
3381 /*
3382  * No permission check is needed.
3383  */
3384 static int mdd_close(const struct lu_env *env, struct md_object *obj,
3385                      struct md_attr *ma, u64 open_flags)
3386 {
3387         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3388         struct mdd_device *mdd = mdo2mdd(obj);
3389         struct thandle *handle = NULL;
3390         int is_orphan = 0;
3391         int rc;
3392         bool blocked = false;
3393         bool last_close_by_uid = false;
3394         const struct lu_ucred *uc = lu_ucred(env);
3395         ENTRY;
3396
3397         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
3398                 mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
3399                 mdd_obj->mod_count--;
3400                 mdd_write_unlock(env, mdd_obj);
3401
3402                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
3403                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
3404                                 "list\n", PFID(mdd_object_fid(mdd_obj)));
3405                 RETURN(0);
3406         }
3407
3408         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
3409          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
3410         /* check without any lock */
3411         is_orphan = mdd_obj->mod_count == 1 &&
3412                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
3413
3414 again:
3415         if (is_orphan) {
3416                 /* mdd_trans_create() maybe failed because of barrier_entry(),
3417                  * under such case, the orphan MDT-object will be left in the
3418                  * orphan list, and when the MDT remount next time, the unused
3419                  * orphans will be destroyed automatically.
3420                  *
3421                  * One exception: the former mdd_finish_unlink may failed to
3422                  * add the orphan MDT-object to the orphan list, then if the
3423                  * mdd_trans_create() failed because of barrier_entry(), the
3424                  * MDT-object will become real orphan that is neither in the
3425                  * namespace nor in the orphan list. Such bad case should be
3426                  * very rare and will be handled by e2fsck/lfsck. */
3427                 handle = mdd_trans_create(env, mdo2mdd(obj));
3428                 if (IS_ERR(handle)) {
3429                         rc = PTR_ERR(handle);
3430                         if (rc != -EINPROGRESS)
3431                                 GOTO(stop, rc);
3432
3433                         handle = NULL;
3434                         blocked = true;
3435                         goto cont;
3436                 }
3437
3438                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
3439                 if (rc)
3440                         GOTO(stop, rc);
3441
3442                 rc = mdd_declare_changelog_store(env, mdd, CL_CLOSE, NULL, NULL,
3443                                                  handle);
3444                 if (rc)
3445                         GOTO(stop, rc);
3446
3447                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
3448                 if (rc)
3449                         GOTO(stop, rc);
3450         }
3451
3452 cont:
3453         mdd_write_lock(env, mdd_obj, DT_TGT_CHILD);
3454         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
3455         if (rc != 0) {
3456                 CERROR("%s: failed to get lu_attr of "DFID": rc = %d\n",
3457                        lu_dev_name(mdd2lu_dev(mdd)),
3458                        PFID(mdd_object_fid(mdd_obj)), rc);
3459                 GOTO(out, rc);
3460         }
3461
3462         /* check again with lock */
3463         is_orphan = (mdd_obj->mod_count == 1) &&
3464                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
3465                      ma->ma_attr.la_nlink == 0);
3466
3467         if (is_orphan && !handle && !blocked) {
3468                 mdd_write_unlock(env, mdd_obj);
3469                 goto again;
3470         }
3471
3472         mdd_obj->mod_count--; /*release open count */
3473
3474         /* under mdd write lock */
3475         /* If recording, see if we need to remove UID from list. uc is not
3476          * initialized if the client has been evicted. */
3477         if (mdd_changelog_enabled(env, mdd, CL_OPEN) && uc) {
3478                 struct mdd_object_user *mou;
3479
3480                 /* look for UID in list */
3481                 /* If mou is NULL, it probably means logging was enabled after
3482                  * the user had the file open. So the corresponding close
3483                  * will not be logged.
3484                  */
3485                 mou = mdd_obj_user_find(mdd_obj, uc->uc_uid, uc->uc_gid,
3486                                         open_flags);
3487                 if (mou) {
3488                         mou->mou_opencount--;
3489                         if (mou->mou_opencount == 0) {
3490                                 mdd_obj_user_remove(mdd_obj, mou);
3491                                 last_close_by_uid = true;
3492                         }
3493                 }
3494         }
3495
3496         if (!is_orphan || blocked)
3497                 GOTO(out, rc = 0);
3498
3499         /* Orphan object */
3500         /* NB: Object maybe not in orphan list originally, it is rare case for
3501          * mdd_finish_unlink() failure, in that case, the object doesn't have
3502          * ORPHAN_OBJ flag */
3503         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
3504                 /* remove link to object from orphan index */
3505                 LASSERT(handle != NULL);
3506                 rc = mdd_orphan_delete(env, mdd_obj, handle);
3507                 if (rc != 0) {
3508                         CERROR("%s: unable to delete "DFID" from orphan list: "
3509                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
3510                                PFID(mdd_object_fid(mdd_obj)), rc);
3511                         /* If object was not deleted from orphan list, do not
3512                          * destroy OSS objects, which will be done when next
3513                          * recovery. */
3514                         GOTO(out, rc);
3515                 }
3516
3517                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
3518                        "list, OSS objects to be destroyed.\n",
3519                        PFID(mdd_object_fid(mdd_obj)));
3520         }
3521
3522         rc = mdo_destroy(env, mdd_obj, handle);
3523
3524         if (rc != 0) {
3525                 CERROR("%s: unable to delete "DFID" from orphan list: "
3526                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
3527                        PFID(mdd_object_fid(mdd_obj)), rc);
3528         }
3529         EXIT;
3530
3531 out:
3532         mdd_write_unlock(env, mdd_obj);
3533
3534         if (rc != 0 || blocked ||
3535             !mdd_changelog_enabled(env, mdd, CL_CLOSE))
3536                 GOTO(stop, rc);
3537
3538         /* Record CL_CLOSE in changelog only if file was opened in write mode,
3539          * or if CL_OPEN was recorded and it's last close by user.
3540          * Changelogs mask may change between open and close operations, but
3541          * this is not a big deal if we have a CL_CLOSE entry with no matching
3542          * CL_OPEN. Plus Changelogs mask may not change often.
3543          */
3544         if (((!(mdd->mdd_cl.mc_current_mask & BIT(CL_OPEN)) &&
3545               (open_flags & (MDS_FMODE_WRITE | MDS_OPEN_APPEND |
3546                              MDS_OPEN_TRUNC))) ||
3547              ((mdd->mdd_cl.mc_current_mask & BIT(CL_OPEN)) &&
3548               last_close_by_uid)) &&
3549             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
3550                 if (handle == NULL) {
3551                         handle = mdd_trans_create(env, mdo2mdd(obj));
3552                         if (IS_ERR(handle))
3553                                 GOTO(stop, rc = PTR_ERR(handle));
3554
3555                         rc = mdd_declare_changelog_store(env, mdd, CL_CLOSE,
3556                                                          NULL, NULL, handle);
3557                         if (rc)
3558                                 GOTO(stop, rc);
3559
3560                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
3561                         if (rc)
3562                                 GOTO(stop, rc);
3563                 }
3564
3565                 /* FYI, only the bottom 32 bits of open_flags are recorded */
3566                 mdd_changelog_data_store(env, mdd, CL_CLOSE, open_flags,
3567                                          mdd_obj, handle, NULL);
3568         }
3569
3570 stop:
3571         if (handle != NULL && !IS_ERR(handle))
3572                 rc = mdd_trans_stop(env, mdd, rc, handle);
3573
3574         return rc;
3575 }
3576
3577 /*
3578  * Permission check is done when open,
3579  * no need check again.
3580  */
3581 static int mdd_readpage_sanity_check(const struct lu_env *env,
3582                                      struct mdd_object *obj)
3583 {
3584         if (!dt_try_as_dir(env, mdd_object_child(obj), true))
3585                 return -ENOTDIR;
3586
3587         return 0;
3588 }
3589
3590 static int mdd_dir_page_build(const struct lu_env *env, struct dt_object *obj,
3591                               union lu_page *lp, size_t bytes,
3592                               const struct dt_it_ops *iops,
3593                               struct dt_it *it, __u32 attr, void *arg)
3594 {
3595         struct lu_dirpage *dp = &lp->lp_dir;
3596         void *area = dp;
3597         __u64 hash = 0;
3598         struct lu_dirent *ent;
3599         struct lu_dirent *last = NULL;
3600         struct lu_fid fid;
3601         int first = 1;
3602         int result;
3603
3604         if (bytes < sizeof(*dp))
3605                 GOTO(out_err, result = -EOVERFLOW);
3606
3607         memset(area, 0, sizeof(*dp));
3608         area += sizeof(*dp);
3609         bytes -= sizeof(*dp);
3610
3611         ent = area;
3612         do {
3613                 int len;
3614                 size_t recsize;
3615
3616                 len = iops->key_size(env, it);
3617
3618                 /* IAM iterator can return record with zero len. */
3619                 if (len == 0)
3620                         GOTO(next, 0);
3621
3622                 hash = iops->store(env, it);
3623                 if (unlikely(first)) {
3624                         first = 0;
3625                         dp->ldp_hash_start = cpu_to_le64(hash);
3626                 }
3627
3628                 /* calculate max space required for lu_dirent */
3629                 recsize = lu_dirent_calc_size(len, attr);
3630
3631                 if (bytes >= recsize &&
3632                     !OBD_FAIL_CHECK(OBD_FAIL_MDS_DIR_PAGE_WALK)) {
3633                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
3634                         if (result == -ESTALE)
3635                                 GOTO(next, result);
3636                         if (result != 0)
3637                                 GOTO(out, result);
3638
3639                         /* OSD might not able to pack all attributes, so
3640                          * recheck record length had room to store FID
3641                          */
3642                         recsize = le16_to_cpu(ent->lde_reclen);
3643
3644                         if (le32_to_cpu(ent->lde_attrs) & LUDA_FID) {
3645                                 fid_le_to_cpu(&fid, &ent->lde_fid);
3646                                 if (fid_is_dot_lustre(&fid))
3647                                         GOTO(next, recsize);
3648                         }
3649                 } else {
3650                         result = (last != NULL) ? 0 : -EBADSLT;
3651                         GOTO(out, result);
3652                 }
3653                 last = ent;
3654                 ent = (void *)ent + recsize;
3655                 bytes -= recsize;
3656
3657 next:
3658                 result = iops->next(env, it);
3659                 if (result == -ESTALE)
3660                         GOTO(next, result);
3661         } while (result == 0);
3662
3663 out:
3664         dp->ldp_hash_end = cpu_to_le64(hash);
3665         if (last != NULL) {
3666                 if (last->lde_hash == dp->ldp_hash_end)
3667                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
3668                 last->lde_reclen = 0; /* end mark */
3669         }
3670 out_err:
3671         if (result > 0)
3672                 /* end of directory */
3673                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
3674         else if (result < 0)
3675                 CWARN("%s: build page failed for "DFID": rc = %d\n",
3676                       lu_dev_name(obj->do_lu.lo_dev),
3677                       PFID(lu_object_fid(&obj->do_lu)), result);
3678
3679         return result;
3680 }
3681
3682 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
3683                  const struct lu_rdpg *rdpg)
3684 {
3685         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3686         int rc;
3687         ENTRY;
3688
3689         if (mdd_object_exists(mdd_obj) == 0) {
3690                 CERROR("%s: object "DFID" not found: rc = -2\n",
3691                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
3692                 return -ENOENT;
3693         }
3694
3695         mdd_read_lock(env, mdd_obj, DT_TGT_CHILD);
3696         rc = mdd_readpage_sanity_check(env, mdd_obj);
3697         if (rc)
3698                 GOTO(out_unlock, rc);
3699
3700         if (mdd_is_dead_obj(mdd_obj)) {
3701                 struct page *pg;
3702                 struct lu_dirpage *dp;
3703
3704                 /*
3705                  * According to POSIX, please do not return any entry to client:
3706                  * even dot and dotdot should not be returned.
3707                  */
3708                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
3709                        PFID(mdd_object_fid(mdd_obj)));
3710
3711                 if (rdpg->rp_count <= 0)
3712                         GOTO(out_unlock, rc = -EFAULT);
3713                 LASSERT(rdpg->rp_pages != NULL);
3714
3715                 pg = rdpg->rp_pages[0];
3716                 dp = (struct lu_dirpage *)kmap(pg);
3717                 memset(dp, 0 , sizeof(struct lu_dirpage));
3718                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
3719                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
3720                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
3721                 kunmap(pg);
3722                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
3723         }
3724
3725         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
3726                            mdd_dir_page_build, NULL);
3727         if (rc >= 0) {
3728                 struct lu_dirpage       *dp;
3729
3730                 dp = kmap(rdpg->rp_pages[0]);
3731                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
3732                 if (rc == 0) {
3733                         /*
3734                          * No pages were processed, mark this for first page
3735                          * and send back.
3736                          */
3737                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
3738                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
3739                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
3740                 }
3741                 kunmap(rdpg->rp_pages[0]);
3742         }
3743
3744         GOTO(out_unlock, rc);
3745 out_unlock:
3746         mdd_read_unlock(env, mdd_obj);
3747         return rc;
3748 }
3749
3750 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
3751 {
3752         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3753
3754         if (mdd_object_exists(mdd_obj) == 0) {
3755                 int rc = -ENOENT;
3756
3757                 CERROR("%s: object "DFID" not found: rc = %d\n",
3758                        mdd_obj_dev_name(mdd_obj),
3759                        PFID(mdd_object_fid(mdd_obj)), rc);
3760                 return rc;
3761         }
3762         return dt_object_sync(env, mdd_object_child(mdd_obj),
3763                               0, OBD_OBJECT_EOF);
3764 }
3765
3766 static int mdd_object_lock(const struct lu_env *env,
3767                            struct md_object *obj,
3768                            struct lustre_handle *lh,
3769                            struct ldlm_enqueue_info *einfo,
3770                            union ldlm_policy_data *policy)
3771 {
3772         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3773         return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
3774                               einfo, policy);
3775 }
3776
3777 static int mdd_object_unlock(const struct lu_env *env,
3778                              struct md_object *obj,
3779                              struct ldlm_enqueue_info *einfo,
3780                              union ldlm_policy_data *policy)
3781 {
3782         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3783         return dt_object_unlock(env, mdd_object_child(mdd_obj), einfo, policy);
3784 }
3785
3786 const struct md_object_operations mdd_obj_ops = {
3787         .moo_permission         = mdd_permission,
3788         .moo_attr_get           = mdd_attr_get,
3789         .moo_attr_set           = mdd_attr_set,
3790         .moo_xattr_get          = mdd_xattr_get,
3791         .moo_xattr_set          = mdd_xattr_set,
3792         .moo_xattr_list         = mdd_xattr_list,
3793         .moo_invalidate         = mdd_invalidate,
3794         .moo_xattr_del          = mdd_xattr_del,
3795         .moo_swap_layouts       = mdd_swap_layouts,
3796         .moo_open               = mdd_open,
3797         .moo_close              = mdd_close,
3798         .moo_readpage           = mdd_readpage,
3799         .moo_readlink           = mdd_readlink,
3800         .moo_changelog          = mdd_changelog,
3801         .moo_object_sync        = mdd_object_sync,
3802         .moo_object_lock        = mdd_object_lock,
3803         .moo_object_unlock      = mdd_object_unlock,
3804         .moo_layout_change      = mdd_layout_change,
3805 };