Whamcloud - gitweb
17642cac7490823dc10f230ae82506866d27e455
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_object.c
33  *
34  * Lustre Metadata Server (mdd) routines
35  *
36  * Author: Wang Di <wangdi@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <linux/module.h>
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <obd_support.h>
45 #include <lprocfs_status.h>
46 /* fid_be_cpu(), fid_cpu_to_be(). */
47 #include <lustre_fid.h>
48 #include <lustre_idmap.h>
49 #include <uapi/linux/lustre/lustre_param.h>
50 #include <lustre_mds.h>
51
52 #include "mdd_internal.h"
53
54 static const struct lu_object_operations mdd_lu_obj_ops;
55
56 static int mdd_xattr_get(const struct lu_env *env,
57                          struct md_object *obj, struct lu_buf *buf,
58                          const char *name);
59
60 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
61                struct lu_attr *la)
62 {
63         int rc;
64
65         if (mdd_object_exists(obj) == 0)
66                 return -ENOENT;
67
68         rc = mdo_attr_get(env, obj, la);
69         if (unlikely(rc != 0)) {
70                 if (rc == -ENOENT)
71                         obj->mod_flags |= DEAD_OBJ;
72                 return rc;
73         }
74
75         if (la->la_valid & LA_FLAGS &&
76             la->la_flags & LUSTRE_ORPHAN_FL)
77                 obj->mod_flags |= ORPHAN_OBJ | DEAD_OBJ;
78
79         return 0;
80 }
81
82 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
83 {
84         struct mdd_thread_info *info;
85
86         lu_env_refill((struct lu_env *)env);
87         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
88         LASSERT(info != NULL);
89         return info;
90 }
91
92 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
93 {
94         struct lu_buf *buf;
95
96         buf = &mdd_env_info(env)->mti_buf[0];
97         buf->lb_buf = area;
98         buf->lb_len = len;
99         return buf;
100 }
101
102 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
103                                        const void *area, ssize_t len)
104 {
105         struct lu_buf *buf;
106
107         buf = &mdd_env_info(env)->mti_buf[0];
108         buf->lb_buf = (void *)area;
109         buf->lb_len = len;
110         return buf;
111 }
112
113 struct lu_object *mdd_object_alloc(const struct lu_env *env,
114                                    const struct lu_object_header *hdr,
115                                    struct lu_device *d)
116 {
117         struct mdd_object *mdd_obj;
118
119         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, GFP_NOFS);
120         if (mdd_obj != NULL) {
121                 struct lu_object *o;
122
123                 o = mdd2lu_obj(mdd_obj);
124                 lu_object_init(o, NULL, d);
125                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
126                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
127                 mdd_obj->mod_count = 0;
128                 o->lo_ops = &mdd_lu_obj_ops;
129                 return o;
130         } else {
131                 return NULL;
132         }
133 }
134
135 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
136                            const struct lu_object_conf *unused)
137 {
138         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
139         struct mdd_object *mdd_obj = lu2mdd_obj(o);
140         struct lu_object  *below;
141         struct lu_device  *under;
142         ENTRY;
143
144         mdd_obj->mod_cltime = ktime_set(0, 0);
145         under = &d->mdd_child->dd_lu_dev;
146         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
147         if (IS_ERR(below))
148                 RETURN(PTR_ERR(below));
149
150         lu_object_add(o, below);
151
152         RETURN(0);
153 }
154
155 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
156 {
157         int rc = 0;
158
159         if (lu_object_exists(o)) {
160                 struct mdd_object *mdd_obj = lu2mdd_obj(o);
161                 struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
162
163                 rc = mdd_la_get(env, mdd_obj, attr);
164         }
165
166         return rc;
167 }
168
169 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
170 {
171         struct mdd_object *mdd = lu2mdd_obj(o);
172
173         lu_object_fini(o);
174         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
175 }
176
177 static int mdd_object_print(const struct lu_env *env, void *cookie,
178                             lu_printer_t p, const struct lu_object *o)
179 {
180         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
181
182         return (*p)(env, cookie,
183                     LUSTRE_MDD_NAME"-object@%p(open_count=%d, valid=%x, cltime=%lldns, flags=%lx)",
184                     mdd, mdd->mod_count, mdd->mod_valid,
185                     ktime_to_ns(mdd->mod_cltime), mdd->mod_flags);
186 }
187
188 static const struct lu_object_operations mdd_lu_obj_ops = {
189         .loo_object_init    = mdd_object_init,
190         .loo_object_start   = mdd_object_start,
191         .loo_object_free    = mdd_object_free,
192         .loo_object_print   = mdd_object_print,
193 };
194
195 struct mdd_object *mdd_object_find(const struct lu_env *env,
196                                    struct mdd_device *d,
197                                    const struct lu_fid *f)
198 {
199         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
200 }
201
202 /*
203  * No permission check is needed.
204  */
205 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
206                  struct md_attr *ma)
207 {
208         struct mdd_object *mdd_obj = md2mdd_obj(obj);
209         int               rc;
210
211         ENTRY;
212
213         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
214         if ((ma->ma_need & MA_INODE) != 0 && mdd_is_dead_obj(mdd_obj))
215                 ma->ma_attr.la_nlink = 0;
216
217         RETURN(rc);
218 }
219
220 /*
221  * No permission check is needed.
222  */
223 static int mdd_xattr_get(const struct lu_env *env,
224                          struct md_object *obj, struct lu_buf *buf,
225                          const char *name)
226 {
227         struct mdd_object *mdd_obj = md2mdd_obj(obj);
228         int rc;
229
230         ENTRY;
231
232         if (mdd_object_exists(mdd_obj) == 0) {
233                 CERROR("%s: object "DFID" not found: rc = -2\n",
234                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
235                 return -ENOENT;
236         }
237
238         /* If the object has been destroyed, then do not get LMVEA, because
239          * it needs to load stripes from the iteration of the master object,
240          * and it will cause problem if master object has been destroyed, see
241          * LU-6427 */
242         if (unlikely((mdd_obj->mod_flags & DEAD_OBJ) &&
243                      !(mdd_obj->mod_flags & ORPHAN_OBJ) &&
244                       strcmp(name, XATTR_NAME_LMV) == 0))
245                 RETURN(-ENOENT);
246
247         /* If the object has been delete from the namespace, then
248          * get linkEA should return -ENOENT as well */
249         if (unlikely((mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ)) &&
250                       strcmp(name, XATTR_NAME_LINK) == 0))
251                 RETURN(-ENOENT);
252
253         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
254         rc = mdo_xattr_get(env, mdd_obj, buf, name);
255         mdd_read_unlock(env, mdd_obj);
256
257         RETURN(rc);
258 }
259
260 /*
261  * Permission check is done when open,
262  * no need check again.
263  */
264 int mdd_readlink(const struct lu_env *env, struct md_object *obj,
265                  struct lu_buf *buf)
266 {
267         struct mdd_object *mdd_obj = md2mdd_obj(obj);
268         struct dt_object  *next;
269         loff_t             pos = 0;
270         int                rc;
271         ENTRY;
272
273         if (mdd_object_exists(mdd_obj) == 0) {
274                 CERROR("%s: object "DFID" not found: rc = -2\n",
275                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
276                 return -ENOENT;
277         }
278
279         next = mdd_object_child(mdd_obj);
280         LASSERT(next != NULL);
281         LASSERT(next->do_body_ops != NULL);
282         LASSERT(next->do_body_ops->dbo_read != NULL);
283         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
284         rc = dt_read(env, next, buf, &pos);
285         mdd_read_unlock(env, mdd_obj);
286         RETURN(rc);
287 }
288
289 /*
290  * No permission check is needed.
291  */
292 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
293                           struct lu_buf *buf)
294 {
295         struct mdd_object *mdd_obj = md2mdd_obj(obj);
296         int rc;
297
298         ENTRY;
299
300         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
301         rc = mdo_xattr_list(env, mdd_obj, buf);
302         mdd_read_unlock(env, mdd_obj);
303
304         /* If the buffer is NULL then we are only here to get the
305          * length of the xattr name list. */
306         if (rc < 0 || buf->lb_buf == NULL)
307                 RETURN(rc);
308
309         /*
310          * Filter out XATTR_NAME_LINK if this is an orphan object.  See
311          * mdd_xattr_get().
312          */
313         if (unlikely(mdd_obj->mod_flags & (DEAD_OBJ | ORPHAN_OBJ))) {
314                 char   *end = (char *)buf->lb_buf + rc;
315                 char   *p = buf->lb_buf;
316
317                 while (p < end) {
318                         char   *next = p + strlen(p) + 1;
319
320                         if (strcmp(p, XATTR_NAME_LINK) == 0) {
321                                 if (end - next > 0)
322                                         memmove(p, next, end - next);
323                                 rc -= next - p;
324                                 CDEBUG(D_INFO, "Filtered out "XATTR_NAME_LINK
325                                        " of orphan "DFID"\n",
326                                        PFID(mdd_object_fid(mdd_obj)));
327                                 break;
328                         }
329
330                         p = next;
331                 }
332         }
333
334         RETURN(rc);
335 }
336
337 int mdd_invalidate(const struct lu_env *env, struct md_object *obj)
338 {
339         return mdo_invalidate(env, md2mdd_obj(obj));
340 }
341
342 int mdd_declare_create_object_internal(const struct lu_env *env,
343                                        struct mdd_object *p,
344                                        struct mdd_object *c,
345                                        struct lu_attr *attr,
346                                        struct thandle *handle,
347                                        const struct md_op_spec *spec,
348                                        struct dt_allocation_hint *hint)
349 {
350         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
351         const struct dt_index_features *feat = spec->sp_feat;
352         int rc;
353         ENTRY;
354
355         if (feat != &dt_directory_features && feat != NULL) {
356                 dof->dof_type = DFT_INDEX;
357                 dof->u.dof_idx.di_feat = feat;
358         } else {
359                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
360                 if (dof->dof_type == DFT_REGULAR) {
361                         dof->u.dof_reg.striped =
362                                 md_should_create(spec->sp_cr_flags);
363                         if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
364                                 dof->u.dof_reg.striped = 0;
365                         /* is this replay? */
366                         if (spec->no_create)
367                                 dof->u.dof_reg.striped = 0;
368                 }
369         }
370
371         rc = mdo_declare_create_object(env, c, attr, hint, dof, handle);
372
373         RETURN(rc);
374 }
375
376 int mdd_create_object_internal(const struct lu_env *env, struct mdd_object *p,
377                                struct mdd_object *c, struct lu_attr *attr,
378                                struct thandle *handle,
379                                const struct md_op_spec *spec,
380                                struct dt_allocation_hint *hint)
381 {
382         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
383         int rc;
384         ENTRY;
385
386         LASSERT(!mdd_object_exists(c));
387
388         rc = mdo_create_object(env, c, attr, hint, dof, handle);
389
390         RETURN(rc);
391 }
392
393 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
394                           const struct lu_attr *attr, struct thandle *handle,
395                           int needacl)
396 {
397         int rc;
398         ENTRY;
399
400         rc = mdo_attr_set(env, obj, attr, handle);
401 #ifdef CONFIG_FS_POSIX_ACL
402         if (!rc && (attr->la_valid & LA_MODE) && needacl)
403                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
404 #endif
405         RETURN(rc);
406 }
407
408 int mdd_update_time(const struct lu_env *env, struct mdd_object *obj,
409                     const struct lu_attr *oattr, struct lu_attr *attr,
410                     struct thandle *handle)
411 {
412         int rc = 0;
413         ENTRY;
414
415         LASSERT(attr->la_valid & LA_CTIME);
416         LASSERT(oattr != NULL);
417
418         /* Make sure the ctime is increased only, however, it's not strictly
419          * reliable at here because there is not guarantee to hold lock on
420          * object, so we just bypass some unnecessary cmtime setting first
421          * and OSD has to check it again. */
422         if (attr->la_ctime < oattr->la_ctime)
423                 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
424         else if (attr->la_valid == LA_CTIME &&
425                  attr->la_ctime == oattr->la_ctime)
426                 attr->la_valid &= ~LA_CTIME;
427
428         if (attr->la_valid != 0)
429                 rc = mdd_attr_set_internal(env, obj, attr, handle, 0);
430         RETURN(rc);
431 }
432
433 /*
434  * This gives the same functionality as the code between
435  * sys_chmod and inode_setattr
436  * chown_common and inode_setattr
437  * utimes and inode_setattr
438  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
439  */
440 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
441                         const struct lu_attr *oattr, struct lu_attr *la,
442                         const unsigned long flags)
443 {
444         struct lu_ucred  *uc;
445         int               rc = 0;
446         ENTRY;
447
448         if (!la->la_valid)
449                 RETURN(0);
450
451         /* Do not permit change file type */
452         if (la->la_valid & LA_TYPE)
453                 RETURN(-EPERM);
454
455         /* They should not be processed by setattr */
456         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
457                 RETURN(-EPERM);
458
459         LASSERT(oattr != NULL);
460
461         uc = lu_ucred_check(env);
462         if (uc == NULL)
463                 RETURN(0);
464
465         if (la->la_valid == LA_CTIME) {
466                 if (!(flags & MDS_PERM_BYPASS))
467                         /* This is only for set ctime when rename's source is
468                          * on remote MDS. */
469                         rc = mdd_may_delete(env, NULL, NULL, obj, oattr, NULL,
470                                             1, 0);
471                 if (rc == 0 && la->la_ctime <= oattr->la_ctime)
472                         la->la_valid &= ~LA_CTIME;
473                 RETURN(rc);
474         }
475
476         if (la->la_valid == LA_ATIME) {
477                 /* This is an atime-only attribute update for close RPCs. */
478                 if (la->la_atime < (oattr->la_atime +
479                                 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
480                         la->la_valid &= ~LA_ATIME;
481                 RETURN(0);
482         }
483
484         /* Check if flags change. */
485         if (la->la_valid & LA_FLAGS) {
486                 unsigned int oldflags = oattr->la_flags &
487                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
488                 unsigned int newflags = la->la_flags &
489                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
490
491                 if ((uc->uc_fsuid != oattr->la_uid) &&
492                     !md_capable(uc, CFS_CAP_FOWNER))
493                         RETURN(-EPERM);
494
495                 /* The IMMUTABLE and APPEND_ONLY flags can
496                  * only be changed by the relevant capability. */
497                 if ((oldflags ^ newflags) &&
498                     !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
499                         RETURN(-EPERM);
500
501                 if (!S_ISDIR(oattr->la_mode))
502                         la->la_flags &= ~(LUSTRE_DIRSYNC_FL | LUSTRE_TOPDIR_FL);
503         }
504
505         if (oattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL) &&
506             (la->la_valid & ~LA_FLAGS) &&
507             !(flags & MDS_PERM_BYPASS))
508                 RETURN(-EPERM);
509
510         /* Check for setting the obj time. */
511         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
512             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
513                 if ((uc->uc_fsuid != oattr->la_uid) &&
514                     !md_capable(uc, CFS_CAP_FOWNER)) {
515                         rc = mdd_permission_internal(env, obj, oattr,
516                                                      MAY_WRITE);
517                         if (rc)
518                                 RETURN(rc);
519                 }
520         }
521
522         if (la->la_valid & LA_KILL_SUID) {
523                 la->la_valid &= ~LA_KILL_SUID;
524                 if ((oattr->la_mode & S_ISUID) &&
525                     !(la->la_valid & LA_MODE)) {
526                         la->la_mode = oattr->la_mode;
527                         la->la_valid |= LA_MODE;
528                 }
529                 la->la_mode &= ~S_ISUID;
530         }
531
532         if (la->la_valid & LA_KILL_SGID) {
533                 la->la_valid &= ~LA_KILL_SGID;
534                 if (((oattr->la_mode & (S_ISGID | S_IXGRP)) ==
535                         (S_ISGID | S_IXGRP)) &&
536                     !(la->la_valid & LA_MODE)) {
537                         la->la_mode = oattr->la_mode;
538                         la->la_valid |= LA_MODE;
539                 }
540                 la->la_mode &= ~S_ISGID;
541         }
542
543         /* Make sure a caller can chmod. */
544         if (la->la_valid & LA_MODE) {
545                 if (!(flags & MDS_PERM_BYPASS) &&
546                     (uc->uc_fsuid != oattr->la_uid) &&
547                     !md_capable(uc, CFS_CAP_FOWNER))
548                         RETURN(-EPERM);
549
550                 if (la->la_mode == (umode_t) -1)
551                         la->la_mode = oattr->la_mode;
552                 else
553                         la->la_mode = (la->la_mode & S_IALLUGO) |
554                                         (oattr->la_mode & ~S_IALLUGO);
555
556                 /* Also check the setgid bit! */
557                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
558                                        la->la_gid : oattr->la_gid) &&
559                     !md_capable(uc, CFS_CAP_FSETID))
560                         la->la_mode &= ~S_ISGID;
561         } else {
562                la->la_mode = oattr->la_mode;
563         }
564
565         /* Make sure a caller can chown. */
566         if (la->la_valid & LA_UID) {
567                 if (la->la_uid == (uid_t) -1)
568                         la->la_uid = oattr->la_uid;
569                 if (((uc->uc_fsuid != oattr->la_uid) ||
570                      (la->la_uid != oattr->la_uid)) &&
571                     !md_capable(uc, CFS_CAP_CHOWN))
572                         RETURN(-EPERM);
573
574                 /* If the user or group of a non-directory has been
575                  * changed by a non-root user, remove the setuid bit.
576                  * 19981026 David C Niemi <niemi@tux.org>
577                  *
578                  * Changed this to apply to all users, including root,
579                  * to avoid some races. This is the behavior we had in
580                  * 2.0. The check for non-root was definitely wrong
581                  * for 2.2 anyway, as it should have been using
582                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
583                 if (((oattr->la_mode & S_ISUID) == S_ISUID) &&
584                 !S_ISDIR(oattr->la_mode)) {
585                         la->la_mode &= ~S_ISUID;
586                         la->la_valid |= LA_MODE;
587                 }
588         }
589
590         /* Make sure caller can chgrp. */
591         if (la->la_valid & LA_GID) {
592                 if (la->la_gid == (gid_t) -1)
593                         la->la_gid = oattr->la_gid;
594                 if (((uc->uc_fsuid != oattr->la_uid) ||
595                      ((la->la_gid != oattr->la_gid) &&
596                       !lustre_in_group_p(uc, la->la_gid))) &&
597                     !md_capable(uc, CFS_CAP_CHOWN))
598                         RETURN(-EPERM);
599
600                 /* Likewise, if the user or group of a non-directory
601                  * has been changed by a non-root user, remove the
602                  * setgid bit UNLESS there is no group execute bit
603                  * (this would be a file marked for mandatory
604                  * locking).  19981026 David C Niemi <niemi@tux.org>
605                  *
606                  * Removed the fsuid check (see the comment above) --
607                  * 19990830 SD. */
608                 if (((oattr->la_mode & (S_ISGID | S_IXGRP)) ==
609                     (S_ISGID | S_IXGRP)) && !S_ISDIR(oattr->la_mode)) {
610                         la->la_mode &= ~S_ISGID;
611                         la->la_valid |= LA_MODE;
612                 }
613         }
614
615         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
616                 if (!((flags & MDS_OWNEROVERRIDE) &&
617                       (uc->uc_fsuid == oattr->la_uid)) &&
618                     !(flags & MDS_PERM_BYPASS)) {
619                         rc = mdd_permission_internal(env, obj, oattr,
620                                                      MAY_WRITE);
621                         if (rc != 0)
622                                 RETURN(rc);
623                 }
624         }
625
626         if (la->la_valid & LA_CTIME) {
627                 /* The pure setattr, it has the priority over what is
628                  * already set, do not drop it if ctime is equal. */
629                 if (la->la_ctime < oattr->la_ctime)
630                         la->la_valid &= ~(LA_ATIME | LA_MTIME | LA_CTIME);
631         }
632
633         RETURN(0);
634 }
635
636 static int mdd_changelog_data_store_by_fid(const struct lu_env *env,
637                                     struct mdd_device *mdd,
638                                     enum changelog_rec_type type, int flags,
639                                     const struct lu_fid *fid,
640                                     struct thandle *handle)
641 {
642         const struct lu_ucred *uc = lu_ucred(env);
643         struct llog_changelog_rec *rec;
644         struct lu_buf *buf;
645         int reclen;
646         int xflags = CLFE_INVALID;
647         int rc;
648
649         flags = (flags & CLF_FLAGMASK) | CLF_VERSION | CLF_EXTRA_FLAGS;
650
651         if (uc) {
652                 if (uc->uc_jobid[0] != '\0')
653                         flags |= CLF_JOBID;
654                 xflags |= CLFE_UIDGID;
655                 xflags |= CLFE_NID;
656         }
657         if (type == CL_OPEN)
658                 xflags |= CLFE_OPEN;
659
660         reclen = llog_data_len(LLOG_CHANGELOG_HDR_SZ +
661                                changelog_rec_offset(flags & CLF_SUPPORTED,
662                                                     xflags & CLFE_SUPPORTED));
663         buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
664         if (buf->lb_buf == NULL)
665                 RETURN(-ENOMEM);
666         rec = buf->lb_buf;
667
668         rec->cr_hdr.lrh_len = reclen;
669         rec->cr.cr_flags = flags;
670         rec->cr.cr_type = (__u32)type;
671         rec->cr.cr_tfid = *fid;
672         rec->cr.cr_namelen = 0;
673
674         if (flags & CLF_JOBID)
675                 mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
676
677         if (flags & CLF_EXTRA_FLAGS) {
678                 mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
679                 if (xflags & CLFE_UIDGID)
680                         mdd_changelog_rec_extra_uidgid(&rec->cr,
681                                                        uc->uc_uid, uc->uc_gid);
682                 if (xflags & CLFE_NID)
683                         mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
684                 if (xflags & CLFE_OPEN)
685                         mdd_changelog_rec_extra_omode(&rec->cr, flags);
686         }
687
688         rc = mdd_changelog_store(env, mdd, rec, handle);
689         RETURN(rc);
690 }
691
692
693 /** Store a data change changelog record
694  * If this fails, we must fail the whole transaction; we don't
695  * want the change to commit without the log entry.
696  * \param mdd_obj - mdd_object of change
697  * \param handle - transaction handle
698  */
699 int mdd_changelog_data_store(const struct lu_env *env, struct mdd_device *mdd,
700                              enum changelog_rec_type type, int flags,
701                              struct mdd_object *mdd_obj, struct thandle *handle)
702 {
703         int                              rc;
704
705         LASSERT(mdd_obj != NULL);
706         LASSERT(handle != NULL);
707
708         /* Not recording */
709         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
710                 RETURN(0);
711         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
712                 RETURN(0);
713
714         if (mdd_is_volatile_obj(mdd_obj))
715                 RETURN(0);
716
717         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
718             ktime_before(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
719                 /* Don't need multiple updates in this log */
720                 /* Don't check under lock - no big deal if we get an extra
721                    entry */
722                 RETURN(0);
723         }
724
725         rc = mdd_changelog_data_store_by_fid(env, mdd, type, flags,
726                                              mdo2fid(mdd_obj), handle);
727         if (rc == 0)
728                 mdd_obj->mod_cltime = ktime_get();
729
730         RETURN(rc);
731 }
732
733 static int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
734                   int flags, struct md_device *m, const struct lu_fid *fid)
735 {
736         struct thandle *handle;
737         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
738         int rc;
739         ENTRY;
740
741         /* Not recording */
742         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
743                 RETURN(0);
744         if (!(mdd->mdd_cl.mc_mask & (1 << type)))
745                 RETURN(0);
746
747         LASSERT(fid != NULL);
748
749         handle = mdd_trans_create(env, mdd);
750         if (IS_ERR(handle))
751                 RETURN(PTR_ERR(handle));
752
753         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
754         if (rc)
755                 GOTO(stop, rc);
756
757         rc = mdd_trans_start(env, mdd, handle);
758         if (rc)
759                 GOTO(stop, rc);
760
761         rc = mdd_changelog_data_store_by_fid(env, mdd, type, flags,
762                                                      fid, handle);
763
764 stop:
765         rc = mdd_trans_stop(env, mdd, rc, handle);
766
767         RETURN(rc);
768 }
769
770 /**
771  * Save LMA extended attributes with data from \a ma.
772  *
773  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
774  * not, LMA EA will be first read from disk, modified and write back.
775  *
776  */
777 /* Precedence for choosing record type when multiple
778  * attributes change: setattr > mtime > ctime > atime
779  * (ctime changes when mtime does, plus chmod/chown.
780  * atime and ctime are independent.) */
781 static int mdd_attr_set_changelog(const struct lu_env *env,
782                                   struct md_object *obj, struct thandle *handle,
783                                   __u64 valid)
784 {
785         struct mdd_device *mdd = mdo2mdd(obj);
786         int bits, type = 0;
787
788         bits =  (valid & LA_SIZE)  ? 1 << CL_TRUNC : 0;
789         bits |= (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
790         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
791         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
792         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
793         bits = bits & mdd->mdd_cl.mc_mask;
794         /* This is an implementation limit rather than a protocol limit */
795         CLASSERT(CL_LAST <= sizeof(int) * 8);
796         if (bits == 0)
797                 return 0;
798
799         /* The record type is the lowest non-masked set bit */
800         type = __ffs(bits);
801
802         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
803         return mdd_changelog_data_store(env, mdd, type, (int)valid,
804                                         md2mdd_obj(obj), handle);
805 }
806
807 static int mdd_declare_attr_set(const struct lu_env *env,
808                                 struct mdd_device *mdd,
809                                 struct mdd_object *obj,
810                                 const struct lu_attr *attr,
811                                 struct thandle *handle)
812 {
813         int rc;
814
815         rc = mdo_declare_attr_set(env, obj, attr, handle);
816         if (rc)
817                 return rc;
818
819 #ifdef CONFIG_FS_POSIX_ACL
820         if (attr->la_valid & LA_MODE) {
821                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
822                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
823                                    XATTR_NAME_ACL_ACCESS);
824                 mdd_read_unlock(env, obj);
825                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
826                         rc = 0;
827                 else if (rc < 0)
828                         return rc;
829
830                 if (rc != 0) {
831                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
832                         rc = mdo_declare_xattr_set(env, obj, buf,
833                                                    XATTR_NAME_ACL_ACCESS, 0,
834                                                    handle);
835                         if (rc)
836                                 return rc;
837                 }
838         }
839 #endif
840
841         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
842         return rc;
843 }
844
845 /*
846  * LU-3671
847  *
848  * permission changes may require sync operation, to mitigate performance
849  * impact, only do this for dir and when permission is reduced.
850  *
851  * For regular files, version is updated with permission change (see VBR), async
852  * permission won't cause any issue, while missing permission change on
853  * directory may affect accessibility of other objects after recovery.
854  */
855 static inline bool permission_needs_sync(const struct lu_attr *old,
856                                          const struct lu_attr *new)
857 {
858         if (!S_ISDIR(old->la_mode))
859                 return false;
860
861         if (new->la_valid & (LA_UID | LA_GID))
862                 return true;
863
864         if (new->la_valid & LA_MODE &&
865             new->la_mode & (S_ISUID | S_ISGID | S_ISVTX))
866                 return true;
867
868         if ((new->la_valid & LA_MODE) &&
869             ((new->la_mode & old->la_mode) & S_IRWXUGO) !=
870              (old->la_mode & S_IRWXUGO))
871                 return true;
872
873         return false;
874 }
875
876 /* set attr and LOV EA at once, return updated attr */
877 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
878                  const struct md_attr *ma)
879 {
880         struct mdd_object *mdd_obj = md2mdd_obj(obj);
881         struct mdd_device *mdd = mdo2mdd(obj);
882         struct thandle *handle = NULL;
883         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
884         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
885         const struct lu_attr *la = &ma->ma_attr;
886         struct lu_ucred  *uc;
887         int rc;
888         ENTRY;
889
890         /* we do not use ->attr_set() for LOV/HSM EA any more */
891         LASSERT((ma->ma_valid & MA_LOV) == 0);
892         LASSERT((ma->ma_valid & MA_HSM) == 0);
893
894         rc = mdd_la_get(env, mdd_obj, attr);
895         if (rc)
896                 RETURN(rc);
897
898         *la_copy = ma->ma_attr;
899         rc = mdd_fix_attr(env, mdd_obj, attr, la_copy, ma->ma_attr_flags);
900         if (rc)
901                 RETURN(rc);
902
903         /* no need to setattr anymore */
904         if (la_copy->la_valid == 0) {
905                 CDEBUG(D_INODE, "%s: no valid attribute on "DFID", previous"
906                        "valid is %#llx\n", mdd2obd_dev(mdd)->obd_name,
907                        PFID(mdo2fid(mdd_obj)), la->la_valid);
908
909                 RETURN(0);
910         }
911
912         /* If an unprivileged user changes group of some file,
913          * the setattr operation will be processed synchronously to
914          * honor the quota limit of the corresponding group. see LU-5152 */
915         uc = lu_ucred_check(env);
916         if (S_ISREG(attr->la_mode) && la->la_valid & LA_GID &&
917             la->la_gid != attr->la_gid && uc != NULL && uc->uc_fsuid != 0) {
918                 la_copy->la_valid |= LA_FLAGS;
919                 la_copy->la_flags |= LUSTRE_SET_SYNC_FL;
920
921                 /* Flush the possible existing sync requests to OSTs to
922                  * keep the order of sync for the current setattr operation
923                  * will be sent directly to OSTs. see LU-5152 */
924                 rc = dt_sync(env, mdd->mdd_child);
925                 if (rc)
926                         GOTO(out, rc);
927         }
928
929         handle = mdd_trans_create(env, mdd);
930         if (IS_ERR(handle)) {
931                 rc = PTR_ERR(handle);
932                 handle = NULL;
933
934                 GOTO(out, rc);
935         }
936
937         rc = mdd_declare_attr_set(env, mdd, mdd_obj, la_copy, handle);
938         if (rc)
939                 GOTO(out, rc);
940
941         rc = mdd_trans_start(env, mdd, handle);
942         if (rc)
943                 GOTO(out, rc);
944
945         if (mdd->mdd_sync_permission && permission_needs_sync(attr, la))
946                 handle->th_sync = 1;
947
948         if (la->la_valid & (LA_MTIME | LA_CTIME))
949                 CDEBUG(D_INODE, "setting mtime %llu, ctime %llu\n",
950                        la->la_mtime, la->la_ctime);
951
952         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
953         if (la_copy->la_valid) {
954                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
955
956                 if (rc == -EDQUOT && la_copy->la_flags & LUSTRE_SET_SYNC_FL) {
957                         /* rollback to the original gid */
958                         la_copy->la_flags &= ~LUSTRE_SET_SYNC_FL;
959                         la_copy->la_gid = attr->la_gid;
960                         mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
961                 }
962         }
963         mdd_write_unlock(env, mdd_obj);
964
965 out:
966         if (rc == 0)
967                 rc = mdd_attr_set_changelog(env, obj, handle,
968                                             la_copy->la_valid);
969
970         if (handle != NULL)
971                 rc = mdd_trans_stop(env, mdd, rc, handle);
972
973         return rc;
974 }
975
976 static int mdd_xattr_sanity_check(const struct lu_env *env,
977                                   struct mdd_object *obj,
978                                   const struct lu_attr *attr,
979                                   const char *name)
980 {
981         struct lu_ucred *uc     = lu_ucred_assert(env);
982         ENTRY;
983
984         if (attr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
985                 RETURN(-EPERM);
986
987         if (strncmp(XATTR_USER_PREFIX, name,
988                     sizeof(XATTR_USER_PREFIX) - 1) == 0) {
989                 /* For sticky directories, only the owner and privileged user
990                  * can write attributes. */
991                 if (S_ISDIR(attr->la_mode) && (attr->la_mode & S_ISVTX) &&
992                     (uc->uc_fsuid != attr->la_uid) &&
993                     !md_capable(uc, CFS_CAP_FOWNER))
994                         RETURN(-EPERM);
995         } else {
996                 if ((uc->uc_fsuid != attr->la_uid) &&
997                     !md_capable(uc, CFS_CAP_FOWNER))
998                         RETURN(-EPERM);
999         }
1000
1001         RETURN(0);
1002 }
1003
1004 /**
1005  * Check if a string begins with a given prefix.
1006  *
1007  * \param str     String to check
1008  * \param prefix  Substring to check at the beginning of \a str
1009  * \return true/false whether the condition is verified.
1010  */
1011 static inline bool has_prefix(const char *str, const char *prefix)
1012 {
1013         return strncmp(prefix, str, strlen(prefix)) == 0;
1014 }
1015
1016 /**
1017  * Indicate the kind of changelog to store (if any) for a xattr set/del.
1018  *
1019  * \param[in]  xattr_name  Full extended attribute name.
1020  *
1021  * \return The type of changelog to use, or -1 if no changelog is to be emitted.
1022  */
1023 static enum changelog_rec_type
1024 mdd_xattr_changelog_type(const struct lu_env *env, struct mdd_device *mdd,
1025                          const char *xattr_name)
1026 {
1027         /* Layout changes systematically recorded */
1028         if (strcmp(XATTR_NAME_LOV, xattr_name) == 0 ||
1029             strncmp(XATTR_LUSTRE_LOV, xattr_name,
1030                     strlen(XATTR_LUSTRE_LOV)) == 0)
1031                 return CL_LAYOUT;
1032
1033         /* HSM information changes systematically recorded */
1034         if (strcmp(XATTR_NAME_HSM, xattr_name) == 0)
1035                 return CL_HSM;
1036
1037         if (has_prefix(xattr_name, XATTR_USER_PREFIX) ||
1038             has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_ACCESS) ||
1039             has_prefix(xattr_name, XATTR_NAME_POSIX_ACL_DEFAULT))
1040                 return CL_XATTR;
1041
1042         return -1;
1043 }
1044
1045 static int mdd_declare_xattr_set(const struct lu_env *env,
1046                                  struct mdd_device *mdd,
1047                                  struct mdd_object *obj,
1048                                  const struct lu_buf *buf,
1049                                  const char *name,
1050                                  int fl, struct thandle *handle)
1051 {
1052         int     rc;
1053
1054         rc = mdo_declare_xattr_set(env, obj, buf, name, fl, handle);
1055         if (rc)
1056                 return rc;
1057
1058         if (mdd_xattr_changelog_type(env, mdd, name) < 0)
1059                 return 0; /* no changelog to store */
1060
1061         return mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1062 }
1063
1064 /*
1065  * Compare current and future data of HSM EA and add a changelog if needed.
1066  *
1067  * Caller should have write-locked \param obj.
1068  *
1069  * \param buf - Future HSM EA content.
1070  * \retval 0 if no changelog is needed or changelog was added properly.
1071  * \retval -ve errno if there was a problem
1072  */
1073 static int mdd_hsm_update_locked(const struct lu_env *env,
1074                                  struct md_object *obj,
1075                                  const struct lu_buf *buf,
1076                                  struct thandle *handle, int *cl_flags)
1077 {
1078         struct mdd_thread_info *info = mdd_env_info(env);
1079         struct mdd_object      *mdd_obj = md2mdd_obj(obj);
1080         struct lu_buf          *current_buf;
1081         struct md_hsm          *current_mh;
1082         struct md_hsm          *new_mh;
1083         int                     rc;
1084         ENTRY;
1085
1086         OBD_ALLOC_PTR(current_mh);
1087         if (current_mh == NULL)
1088                 RETURN(-ENOMEM);
1089
1090         /* Read HSM attrs from disk */
1091         current_buf = lu_buf_check_and_alloc(&info->mti_xattr_buf,
1092                                 mdo2mdd(obj)->mdd_dt_conf.ddp_max_ea_size);
1093         rc = mdo_xattr_get(env, mdd_obj, current_buf, XATTR_NAME_HSM);
1094         rc = lustre_buf2hsm(current_buf->lb_buf, rc, current_mh);
1095         if (rc < 0 && rc != -ENODATA)
1096                 GOTO(free, rc);
1097         else if (rc == -ENODATA)
1098                 current_mh->mh_flags = 0;
1099
1100         /* Map future HSM xattr */
1101         OBD_ALLOC_PTR(new_mh);
1102         if (new_mh == NULL)
1103                 GOTO(free, rc = -ENOMEM);
1104         lustre_buf2hsm(buf->lb_buf, buf->lb_len, new_mh);
1105
1106         rc = 0;
1107
1108         /* Flags differ, set flags for the changelog that will be added */
1109         if (current_mh->mh_flags != new_mh->mh_flags) {
1110                 hsm_set_cl_event(cl_flags, HE_STATE);
1111                 if (new_mh->mh_flags & HS_DIRTY)
1112                         hsm_set_cl_flags(cl_flags, CLF_HSM_DIRTY);
1113         }
1114
1115         OBD_FREE_PTR(new_mh);
1116         EXIT;
1117 free:
1118         OBD_FREE_PTR(current_mh);
1119         return rc;
1120 }
1121
1122 static int mdd_object_pfid_replace(const struct lu_env *env,
1123                                    struct mdd_object *o)
1124 {
1125         struct mdd_device *mdd = mdo2mdd(&o->mod_obj);
1126         struct thandle *handle;
1127         int rc;
1128
1129         handle = mdd_trans_create(env, mdd);
1130         if (IS_ERR(handle))
1131                 RETURN(PTR_ERR(handle));
1132
1133         handle->th_complex = 1;
1134
1135         /* it doesn't need to track the PFID update via llog, because LFSCK
1136          * will repair it even it goes wrong */
1137         rc = mdd_declare_xattr_set(env, mdd, o, NULL, XATTR_NAME_FID,
1138                                    0, handle);
1139         if (rc)
1140                 GOTO(out, rc);
1141
1142         rc = mdd_trans_start(env, mdd, handle);
1143         if (rc != 0)
1144                 GOTO(out, rc);
1145
1146         rc = mdo_xattr_set(env, o, NULL, XATTR_NAME_FID, 0, handle);
1147         if (rc)
1148                 GOTO(out, rc);
1149
1150 out:
1151         mdd_trans_stop(env, mdd, rc, handle);
1152         return rc;
1153 }
1154
1155
1156 static int mdd_declare_xattr_del(const struct lu_env *env,
1157                                  struct mdd_device *mdd,
1158                                  struct mdd_object *obj,
1159                                  const char *name,
1160                                  struct thandle *handle);
1161
1162 static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1163                          const char *name);
1164
1165 static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
1166                            struct md_object *md_vic)
1167 {
1168         struct mdd_device *mdd = mdo2mdd(md_obj);
1169         struct mdd_object *obj = md2mdd_obj(md_obj);
1170         struct mdd_object *vic = md2mdd_obj(md_vic);
1171         struct lu_buf *buf = &mdd_env_info(env)->mti_buf[0];
1172         struct lu_buf *buf_vic = &mdd_env_info(env)->mti_buf[1];
1173         struct lov_mds_md *lmm;
1174         struct thandle *handle;
1175         int rc;
1176         ENTRY;
1177
1178         rc = lu_fid_cmp(mdo2fid(obj), mdo2fid(vic));
1179         if (rc == 0) /* same fid */
1180                 RETURN(-EPERM);
1181
1182         handle = mdd_trans_create(env, mdd);
1183         if (IS_ERR(handle))
1184                 RETURN(PTR_ERR(handle));
1185
1186         if (rc > 0) {
1187                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1188                 mdd_write_lock(env, vic, MOR_TGT_CHILD);
1189         } else {
1190                 mdd_write_lock(env, vic, MOR_TGT_CHILD);
1191                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1192         }
1193
1194         /* get EA of victim file */
1195         memset(buf_vic, 0, sizeof(*buf_vic));
1196         rc = mdd_get_lov_ea(env, vic, buf_vic);
1197         if (rc < 0) {
1198                 if (rc == -ENODATA)
1199                         rc = 0;
1200                 GOTO(out, rc);
1201         }
1202
1203         /* parse the layout of victim file */
1204         lmm = buf_vic->lb_buf;
1205         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
1206                 GOTO(out, rc = -EINVAL);
1207
1208         /* save EA of target file for restore */
1209         memset(buf, 0, sizeof(*buf));
1210         rc = mdd_get_lov_ea(env, obj, buf);
1211         if (rc < 0)
1212                 GOTO(out, rc);
1213
1214         /* Get rid of the layout from victim object */
1215         rc = mdd_declare_xattr_del(env, mdd, vic, XATTR_NAME_LOV, handle);
1216         if (rc)
1217                 GOTO(out, rc);
1218
1219         rc = mdd_declare_xattr_set(env, mdd, obj, buf_vic, XATTR_LUSTRE_LOV,
1220                                    LU_XATTR_MERGE, handle);
1221         if (rc)
1222                 GOTO(out, rc);
1223
1224         rc = mdd_trans_start(env, mdd, handle);
1225         if (rc != 0)
1226                 GOTO(out, rc);
1227
1228         rc = mdo_xattr_set(env, obj, buf_vic, XATTR_LUSTRE_LOV, LU_XATTR_MERGE,
1229                            handle);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mdo_xattr_del(env, vic, XATTR_NAME_LOV, handle);
1234         if (rc) /* wtf? */
1235                 GOTO(out_restore, rc);
1236
1237         (void)mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
1238         (void)mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle);
1239         EXIT;
1240
1241 out_restore:
1242         if (rc) {
1243                 int rc2 = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1244                                         LU_XATTR_REPLACE, handle);
1245                 if (rc2)
1246                         CERROR("%s: failed to rollback of layout of: "DFID
1247                                ": %d, file state unknown\n",
1248                                mdd_obj_dev_name(obj), PFID(mdo2fid(obj)), rc2);
1249         }
1250
1251 out:
1252         mdd_trans_stop(env, mdd, rc, handle);
1253         mdd_write_unlock(env, obj);
1254         mdd_write_unlock(env, vic);
1255         lu_buf_free(buf);
1256         lu_buf_free(buf_vic);
1257
1258         if (!rc)
1259                 (void) mdd_object_pfid_replace(env, obj);
1260
1261         return rc;
1262 }
1263
1264 /**
1265  * Extract the mirror with specified mirror id, and store the splitted
1266  * mirror layout to @buf.
1267  *
1268  * \param[in] comp_v1   mirrored layout
1269  * \param[in] mirror_id the mirror with mirror_id to be extracted
1270  * \param[out] buf      store the layout excluding the extracted mirror,
1271  *                      caller free the buffer we allocated in this function
1272  * \param[out] buf_vic  store the extracted layout, caller free the buffer
1273  *                      we allocated in this function
1274  *
1275  * \retval      0 on success; < 0 if error happens
1276  */
1277 static int mdd_split_ea(struct lov_comp_md_v1 *comp_v1, __u16 mirror_id,
1278                         struct lu_buf *buf, struct lu_buf *buf_vic)
1279 {
1280         struct lov_comp_md_v1 *comp_rem;
1281         struct lov_comp_md_v1 *comp_vic;
1282         struct lov_comp_md_entry_v1 *entry;
1283         struct lov_comp_md_entry_v1 *entry_rem;
1284         struct lov_comp_md_entry_v1 *entry_vic;
1285         __u16 mirror_cnt;
1286         __u16 comp_cnt, count = 0;
1287         int lmm_size, lmm_size_vic = 0;
1288         int i, j, k;
1289         int offset, offset_rem, offset_vic;
1290
1291         mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
1292         /* comp_v1 should contains more than 1 mirror */
1293         if (mirror_cnt <= 1)
1294                 return -EINVAL;
1295         comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
1296         lmm_size = le32_to_cpu(comp_v1->lcm_size);
1297
1298         for (i = 0; i < comp_cnt; i++) {
1299                 entry = &comp_v1->lcm_entries[i];
1300                 if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id) {
1301                         count++;
1302                         lmm_size_vic += sizeof(*entry);
1303                         lmm_size_vic += le32_to_cpu(entry->lcme_size);
1304                 } else if (count > 0) {
1305                         /* find the specified mirror */
1306                         break;
1307                 }
1308         }
1309
1310         if (count == 0)
1311                 return -EINVAL;
1312
1313         lu_buf_alloc(buf, lmm_size - lmm_size_vic);
1314         if (!buf->lb_buf)
1315                 return -ENOMEM;
1316
1317         lu_buf_alloc(buf_vic, sizeof(*comp_vic) + lmm_size_vic);
1318         if (!buf_vic->lb_buf) {
1319                 lu_buf_free(buf);
1320                 return -ENOMEM;
1321         }
1322
1323         comp_rem = (struct lov_comp_md_v1 *)buf->lb_buf;
1324         comp_vic = (struct lov_comp_md_v1 *)buf_vic->lb_buf;
1325
1326         memcpy(comp_rem, comp_v1, sizeof(*comp_v1));
1327         comp_rem->lcm_mirror_count = cpu_to_le16(mirror_cnt - 2);
1328         comp_rem->lcm_entry_count = cpu_to_le32(comp_cnt - count);
1329         comp_rem->lcm_size = cpu_to_le32(lmm_size - lmm_size_vic);
1330         if (!comp_rem->lcm_mirror_count)
1331                 comp_rem->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
1332
1333         memset(comp_vic, 0, sizeof(*comp_v1));
1334         comp_vic->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
1335         comp_vic->lcm_mirror_count = 0;
1336         comp_vic->lcm_entry_count = cpu_to_le32(count);
1337         comp_vic->lcm_size = cpu_to_le32(lmm_size_vic + sizeof(*comp_vic));
1338         comp_vic->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
1339         comp_vic->lcm_layout_gen = 0;
1340
1341         offset = sizeof(*comp_v1) + sizeof(*entry) * comp_cnt;
1342         offset_rem = sizeof(*comp_rem) +
1343                      sizeof(*entry_rem) * (comp_cnt - count);
1344         offset_vic = sizeof(*comp_vic) + sizeof(*entry_vic) * count;
1345         for (i = j = k = 0; i < comp_cnt; i++) {
1346                 struct lov_mds_md *lmm, *lmm_dst;
1347                 bool vic = false;
1348
1349                 entry = &comp_v1->lcm_entries[i];
1350                 entry_vic = &comp_vic->lcm_entries[j];
1351                 entry_rem = &comp_rem->lcm_entries[k];
1352
1353                 if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id)
1354                         vic = true;
1355
1356                 /* copy component entry */
1357                 if (vic) {
1358                         memcpy(entry_vic, entry, sizeof(*entry));
1359                         entry_vic->lcme_flags &= cpu_to_le32(LCME_FL_INIT);
1360                         entry_vic->lcme_offset = cpu_to_le32(offset_vic);
1361                         j++;
1362                 } else {
1363                         memcpy(entry_rem, entry, sizeof(*entry));
1364                         entry_rem->lcme_offset = cpu_to_le32(offset_rem);
1365                         k++;
1366                 }
1367
1368                 lmm = (struct lov_mds_md *)((char *)comp_v1 + offset);
1369                 if (vic)
1370                         lmm_dst = (struct lov_mds_md *)
1371                                         ((char *)comp_vic + offset_vic);
1372                 else
1373                         lmm_dst = (struct lov_mds_md *)
1374                                         ((char *)comp_rem + offset_rem);
1375
1376                 /* copy component entry blob */
1377                 memcpy(lmm_dst, lmm, le32_to_cpu(entry->lcme_size));
1378
1379                 /* blob offset advance */
1380                 offset += le32_to_cpu(entry->lcme_size);
1381                 if (vic)
1382                         offset_vic += le32_to_cpu(entry->lcme_size);
1383                 else
1384                         offset_rem += le32_to_cpu(entry->lcme_size);
1385         }
1386
1387         return 0;
1388 }
1389
1390 static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
1391                            struct md_rejig_data *mrd)
1392 {
1393         struct mdd_device *mdd = mdo2mdd(md_obj);
1394         struct mdd_object *obj = md2mdd_obj(md_obj);
1395         struct mdd_object *vic = md2mdd_obj(mrd->mrd_obj);
1396         struct lu_buf *buf = &mdd_env_info(env)->mti_buf[0];
1397         struct lu_buf *buf_save = &mdd_env_info(env)->mti_buf[1];
1398         struct lu_buf *buf_vic = &mdd_env_info(env)->mti_buf[2];
1399         struct lov_comp_md_v1 *lcm;
1400         struct thandle *handle;
1401         int rc;
1402         ENTRY;
1403
1404         rc = lu_fid_cmp(mdo2fid(obj), mdo2fid(vic));
1405         if (rc == 0) /* same fid */
1406                 RETURN(-EPERM);
1407
1408         handle = mdd_trans_create(env, mdd);
1409         if (IS_ERR(handle))
1410                 RETURN(PTR_ERR(handle));
1411
1412         if (rc > 0) {
1413                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1414                 mdd_write_lock(env, vic, MOR_TGT_CHILD);
1415         } else {
1416                 mdd_write_lock(env, vic, MOR_TGT_CHILD);
1417                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1418         }
1419
1420         /* get EA of mirrored file */
1421         memset(buf_save, 0, sizeof(*buf));
1422         rc = mdd_get_lov_ea(env, obj, buf_save);
1423         if (rc < 0)
1424                 GOTO(out, rc);
1425
1426         lcm = buf_save->lb_buf;
1427         if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
1428                 GOTO(out, rc = -EINVAL);
1429
1430         /**
1431          * Extract the mirror with specified mirror id, and store the splitted
1432          * mirror layout to the victim file.
1433          */
1434         memset(buf, 0, sizeof(*buf));
1435         memset(buf_vic, 0, sizeof(*buf_vic));
1436         rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
1437         if (rc < 0)
1438                 GOTO(out, rc);
1439
1440         rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
1441                                    LU_XATTR_SPLIT, handle);
1442         if (rc)
1443                 GOTO(out, rc);
1444         rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic, XATTR_NAME_LOV,
1445                                    LU_XATTR_SPLIT, handle);
1446         if (rc)
1447                 GOTO(out, rc);
1448
1449         rc = mdd_trans_start(env, mdd, handle);
1450         if (rc)
1451                 GOTO(out, rc);
1452
1453         rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
1454                            handle);
1455         if (rc)
1456                 GOTO(out, rc);
1457
1458         rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV, LU_XATTR_CREATE,
1459                            handle);
1460         if (rc)
1461                 GOTO(out_restore, rc);
1462
1463         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
1464         if (rc)
1465                 GOTO(out, rc);
1466
1467         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle);
1468         if (rc)
1469                 GOTO(out, rc);
1470         EXIT;
1471
1472 out_restore:
1473         if (rc) {
1474                 /* restore obj's layout */
1475                 int rc2 = mdo_xattr_set(env, obj, buf_save, XATTR_NAME_LOV,
1476                                         LU_XATTR_REPLACE, handle);
1477                 if (rc2)
1478                         CERROR("%s: failed to rollback of layout of: "DFID
1479                                ": %d, file state unkonwn.\n",
1480                                mdd_obj_dev_name(obj), PFID(mdo2fid(obj)), rc2);
1481         }
1482 out:
1483         mdd_trans_stop(env, mdd, rc, handle);
1484         mdd_write_unlock(env, obj);
1485         mdd_write_unlock(env, vic);
1486         lu_buf_free(buf_save);
1487         lu_buf_free(buf);
1488         lu_buf_free(buf_vic);
1489
1490         if (!rc)
1491                 (void) mdd_object_pfid_replace(env, obj);
1492
1493         return rc;
1494 }
1495
1496 static int mdd_layout_merge_allowed(const struct lu_env *env,
1497                                     struct md_object *target,
1498                                     struct md_object *victim)
1499 {
1500         struct mdd_object *o1 = md2mdd_obj(target);
1501
1502         /* cannot extend directory's LOVEA */
1503         if (S_ISDIR(mdd_object_type(o1))) {
1504                 CERROR("%s: Don't extend directory's LOVEA, just set it.\n",
1505                        mdd_obj_dev_name(o1));
1506                 RETURN(-EISDIR);
1507         }
1508
1509         RETURN(0);
1510 }
1511
1512 /**
1513  * The caller should guarantee to update the object ctime
1514  * after xattr_set if needed.
1515  */
1516 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1517                          const struct lu_buf *buf, const char *name,
1518                          int fl)
1519 {
1520         struct mdd_object       *mdd_obj = md2mdd_obj(obj);
1521         struct lu_attr          *attr = MDD_ENV_VAR(env, cattr);
1522         struct mdd_device       *mdd = mdo2mdd(obj);
1523         struct thandle          *handle;
1524         enum changelog_rec_type  cl_type;
1525         int                      cl_flags = 0;
1526         int                      rc;
1527         ENTRY;
1528
1529         rc = mdd_la_get(env, mdd_obj, attr);
1530         if (rc)
1531                 RETURN(rc);
1532
1533         rc = mdd_xattr_sanity_check(env, mdd_obj, attr, name);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         if (strcmp(name, XATTR_LUSTRE_LOV) == 0 &&
1538             (fl == LU_XATTR_MERGE || fl == LU_XATTR_SPLIT)) {
1539                 struct md_rejig_data *mrd = buf->lb_buf;
1540                 struct md_object *victim = mrd->mrd_obj;
1541
1542                 if (buf->lb_len != sizeof(*mrd))
1543                         RETURN(-EINVAL);
1544
1545                 rc = mdd_layout_merge_allowed(env, obj, victim);
1546                 if (rc)
1547                         RETURN(rc);
1548
1549                 if (fl == LU_XATTR_MERGE)
1550                         /* merge layout of victim as a mirror of obj's. */
1551                         rc = mdd_xattr_merge(env, obj, victim);
1552                 else
1553                         rc = mdd_xattr_split(env, obj, mrd);
1554                 RETURN(rc);
1555         }
1556
1557         if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
1558             strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
1559                 struct posix_acl *acl;
1560
1561                 /* user may set empty ACL, which should be treated as removing
1562                  * ACL. */
1563                 acl = posix_acl_from_xattr(&init_user_ns, buf->lb_buf,
1564                                            buf->lb_len);
1565                 if (IS_ERR(acl))
1566                         RETURN(PTR_ERR(acl));
1567                 if (acl == NULL) {
1568                         rc = mdd_xattr_del(env, obj, name);
1569                         RETURN(rc);
1570                 }
1571                 posix_acl_release(acl);
1572         }
1573
1574         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1575                 rc = mdd_acl_set(env, mdd_obj, attr, buf, fl);
1576                 RETURN(rc);
1577         }
1578
1579         handle = mdd_trans_create(env, mdd);
1580         if (IS_ERR(handle))
1581                 RETURN(PTR_ERR(handle));
1582
1583         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, fl, handle);
1584         if (rc)
1585                 GOTO(stop, rc);
1586
1587         rc = mdd_trans_start(env, mdd, handle);
1588         if (rc)
1589                 GOTO(stop, rc);
1590
1591         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1592
1593         if (strcmp(XATTR_NAME_HSM, name) == 0) {
1594                 rc = mdd_hsm_update_locked(env, obj, buf, handle, &cl_flags);
1595                 if (rc) {
1596                         mdd_write_unlock(env, mdd_obj);
1597                         GOTO(stop, rc);
1598                 }
1599         }
1600
1601         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle);
1602         mdd_write_unlock(env, mdd_obj);
1603         if (rc)
1604                 GOTO(stop, rc);
1605
1606         cl_type = mdd_xattr_changelog_type(env, mdd, name);
1607         if (cl_type < 0)
1608                 GOTO(stop, rc = 0);
1609
1610         rc = mdd_changelog_data_store(env, mdd, cl_type, cl_flags, mdd_obj,
1611                                       handle);
1612
1613         EXIT;
1614 stop:
1615         return mdd_trans_stop(env, mdd, rc, handle);
1616 }
1617
1618 static int mdd_declare_xattr_del(const struct lu_env *env,
1619                                  struct mdd_device *mdd,
1620                                  struct mdd_object *obj,
1621                                  const char *name,
1622                                  struct thandle *handle)
1623 {
1624         int rc;
1625
1626         rc = mdo_declare_xattr_del(env, obj, name, handle);
1627         if (rc)
1628                 return rc;
1629
1630         if (mdd_xattr_changelog_type(env, mdd, name) < 0)
1631                 return 0; /* no changelog to store */
1632
1633         return mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
1634 }
1635
1636 /**
1637  * The caller should guarantee to update the object ctime
1638  * after xattr_set if needed.
1639  */
1640 static int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1641                          const char *name)
1642 {
1643         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1644         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
1645         struct mdd_device *mdd = mdo2mdd(obj);
1646         struct thandle *handle;
1647         int rc;
1648         ENTRY;
1649
1650         rc = mdd_la_get(env, mdd_obj, attr);
1651         if (rc)
1652                 RETURN(rc);
1653
1654         rc = mdd_xattr_sanity_check(env, mdd_obj, attr, name);
1655         if (rc)
1656                 RETURN(rc);
1657
1658         handle = mdd_trans_create(env, mdd);
1659         if (IS_ERR(handle))
1660                 RETURN(PTR_ERR(handle));
1661
1662         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1663         if (rc)
1664                 GOTO(stop, rc);
1665
1666         rc = mdd_trans_start(env, mdd, handle);
1667         if (rc)
1668                 GOTO(stop, rc);
1669
1670         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1671         rc = mdo_xattr_del(env, mdd_obj, name, handle);
1672         mdd_write_unlock(env, mdd_obj);
1673         if (rc)
1674                 GOTO(stop, rc);
1675
1676         if (mdd_xattr_changelog_type(env, mdd, name) < 0)
1677                 GOTO(stop, rc = 0);
1678
1679         rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj, handle);
1680
1681         EXIT;
1682 stop:
1683         return mdd_trans_stop(env, mdd, rc, handle);
1684 }
1685
1686 /*
1687  * read lov EA of an object
1688  * return the lov EA in an allocated lu_buf
1689  */
1690 int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
1691                    struct lu_buf *lmm_buf)
1692 {
1693         struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
1694         int              rc, bufsize;
1695         ENTRY;
1696
1697 repeat:
1698         rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
1699
1700         if (rc == -ERANGE) {
1701                 /* mti_big_buf is allocated but is too small
1702                  * we need to increase it */
1703                 buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
1704                                              buf->lb_len * 2);
1705                 if (buf->lb_buf == NULL)
1706                         GOTO(out, rc = -ENOMEM);
1707                 goto repeat;
1708         }
1709
1710         if (rc < 0)
1711                 RETURN(rc);
1712
1713         if (rc == 0)
1714                 RETURN(-ENODATA);
1715
1716         bufsize = rc;
1717         if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
1718                 /* mti_big_buf was not allocated, so we have to
1719                  * allocate it based on the ea size */
1720                 buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
1721                                              bufsize);
1722                 if (buf->lb_buf == NULL)
1723                         GOTO(out, rc = -ENOMEM);
1724                 goto repeat;
1725         }
1726
1727         lu_buf_alloc(lmm_buf, bufsize);
1728         if (lmm_buf->lb_buf == NULL)
1729                 GOTO(out, rc = -ENOMEM);
1730
1731         memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
1732         rc = 0;
1733         EXIT;
1734
1735 out:
1736         if (rc < 0)
1737                 lu_buf_free(lmm_buf);
1738         return rc;
1739 }
1740
1741 static int mdd_xattr_hsm_replace(const struct lu_env *env,
1742                                  struct mdd_object *o, struct lu_buf *buf,
1743                                  struct thandle *handle)
1744 {
1745         struct hsm_attrs *attrs;
1746         __u32 hsm_flags;
1747         int flags = 0;
1748         int rc;
1749         ENTRY;
1750
1751         rc = mdo_xattr_set(env, o, buf, XATTR_NAME_HSM, LU_XATTR_REPLACE,
1752                            handle);
1753         if (rc != 0)
1754                 RETURN(rc);
1755
1756         attrs = buf->lb_buf;
1757         hsm_flags = le32_to_cpu(attrs->hsm_flags);
1758         if (!(hsm_flags & HS_RELEASED) || mdd_is_dead_obj(o))
1759                 RETURN(0);
1760
1761         /* Add a changelog record for release. */
1762         hsm_set_cl_event(&flags, HE_RELEASE);
1763         rc = mdd_changelog_data_store(env, mdo2mdd(&o->mod_obj), CL_HSM,
1764                                       flags, o, handle);
1765         RETURN(rc);
1766 }
1767
1768 /*
1769  *  check if layout swapping between 2 objects is allowed
1770  *  the rules are:
1771  *  - only normal FIDs or non-system IGIFs
1772  *  - same type of objects
1773  *  - same owner/group (so quotas are still valid) unless this is from HSM
1774  *    release.
1775  */
1776 static int mdd_layout_swap_allowed(const struct lu_env *env,
1777                                    struct mdd_object *o1,
1778                                    const struct lu_attr *attr1,
1779                                    struct mdd_object *o2,
1780                                    const struct lu_attr *attr2,
1781                                    __u64 flags)
1782 {
1783         const struct lu_fid     *fid1, *fid2;
1784         ENTRY;
1785
1786         fid1 = mdo2fid(o1);
1787         fid2 = mdo2fid(o2);
1788
1789         if (!fid_is_norm(fid1) &&
1790             (!fid_is_igif(fid1) || IS_ERR(mdd_links_get(env, o1))))
1791                 RETURN(-EBADF);
1792
1793         if (!fid_is_norm(fid2) &&
1794             (!fid_is_igif(fid2) || IS_ERR(mdd_links_get(env, o2))))
1795                 RETURN(-EBADF);
1796
1797         if (mdd_object_type(o1) != mdd_object_type(o2)) {
1798                 if (S_ISDIR(mdd_object_type(o1)))
1799                         RETURN(-ENOTDIR);
1800                 if (S_ISREG(mdd_object_type(o1)))
1801                         RETURN(-EISDIR);
1802                 RETURN(-EBADF);
1803         }
1804
1805         if (flags & SWAP_LAYOUTS_MDS_HSM)
1806                 RETURN(0);
1807
1808         if ((attr1->la_uid != attr2->la_uid) ||
1809             (attr1->la_gid != attr2->la_gid))
1810                 RETURN(-EPERM);
1811
1812         RETURN(0);
1813 }
1814
1815 /* XXX To set the proper lmm_oi & lmm_layout_gen when swap layouts, we have to
1816  *     look into the layout in MDD layer. */
1817 static int mdd_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi, bool get)
1818 {
1819         struct lov_comp_md_v1   *comp_v1;
1820         struct lov_mds_md       *v1;
1821         int                      i, ent_count;
1822         __u32                    off;
1823
1824         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
1825                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
1826                 ent_count = le16_to_cpu(comp_v1->lcm_entry_count);
1827
1828                 if (ent_count == 0)
1829                         return -EINVAL;
1830
1831                 if (get) {
1832                         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
1833                         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
1834                         *oi = v1->lmm_oi;
1835                 } else {
1836                         for (i = 0; i < le32_to_cpu(ent_count); i++) {
1837                                 off = le32_to_cpu(comp_v1->lcm_entries[i].
1838                                                 lcme_offset);
1839                                 v1 = (struct lov_mds_md *)((char *)comp_v1 +
1840                                                 off);
1841                                 v1->lmm_oi = *oi;
1842                         }
1843                 }
1844         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1845                    le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
1846                 if (get)
1847                         *oi = lmm->lmm_oi;
1848                 else
1849                         lmm->lmm_oi = *oi;
1850         } else {
1851                 return -EINVAL;
1852         }
1853         return 0;
1854 }
1855
1856 static inline int mdd_get_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi)
1857 {
1858         return mdd_lmm_oi(lmm, oi, true);
1859 }
1860
1861 static inline int mdd_set_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi)
1862 {
1863         return mdd_lmm_oi(lmm, oi, false);
1864 }
1865
1866 static int mdd_lmm_gen(struct lov_mds_md *lmm, __u32 *gen, bool get)
1867 {
1868         struct lov_comp_md_v1 *comp_v1;
1869
1870         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
1871                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
1872                 if (get)
1873                         *gen = le32_to_cpu(comp_v1->lcm_layout_gen);
1874                 else
1875                         comp_v1->lcm_layout_gen = cpu_to_le32(*gen);
1876         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1877                    le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
1878                 __u16 tmp_gen = *gen;
1879                 if (get)
1880                         *gen = le16_to_cpu(lmm->lmm_layout_gen);
1881                 else
1882                         lmm->lmm_layout_gen = cpu_to_le16(tmp_gen);
1883         } else {
1884                 return -EINVAL;
1885         }
1886         return 0;
1887 }
1888
1889 static inline int mdd_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
1890 {
1891         return mdd_lmm_gen(lmm, gen, true);
1892 }
1893
1894 static inline int mdd_set_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
1895 {
1896         return mdd_lmm_gen(lmm, gen, false);
1897 }
1898
1899 /**
1900  * swap layouts between 2 lustre objects
1901  */
1902 static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
1903                             struct md_object *obj2, __u64 flags)
1904 {
1905         struct mdd_thread_info  *info = mdd_env_info(env);
1906         struct mdd_object       *fst_o = md2mdd_obj(obj1);
1907         struct mdd_object       *snd_o = md2mdd_obj(obj2);
1908         struct lu_attr          *fst_la = MDD_ENV_VAR(env, cattr);
1909         struct lu_attr          *snd_la = MDD_ENV_VAR(env, tattr);
1910         struct mdd_device       *mdd = mdo2mdd(obj1);
1911         struct lov_mds_md       *fst_lmm, *snd_lmm;
1912         struct lu_buf           *fst_buf = &info->mti_buf[0];
1913         struct lu_buf           *snd_buf = &info->mti_buf[1];
1914         struct lu_buf           *fst_hsm_buf = &info->mti_buf[2];
1915         struct lu_buf           *snd_hsm_buf = &info->mti_buf[3];
1916         struct ost_id           *saved_oi = NULL;
1917         struct thandle          *handle;
1918         __u32                    fst_gen, snd_gen, saved_gen;
1919         int                      fst_fl;
1920         int                      rc;
1921         int                      rc2;
1922         ENTRY;
1923
1924         CLASSERT(ARRAY_SIZE(info->mti_buf) >= 4);
1925         memset(info->mti_buf, 0, sizeof(info->mti_buf));
1926
1927         /* we have to sort the 2 obj, so locking will always
1928          * be in the same order, even in case of 2 concurrent swaps */
1929         rc = lu_fid_cmp(mdo2fid(fst_o), mdo2fid(snd_o));
1930         if (rc == 0) /* same fid ? */
1931                 RETURN(-EPERM);
1932
1933         if (rc < 0)
1934                 swap(fst_o, snd_o);
1935
1936         rc = mdd_la_get(env, fst_o, fst_la);
1937         if (rc != 0)
1938                 RETURN(rc);
1939
1940         rc = mdd_la_get(env, snd_o, snd_la);
1941         if (rc != 0)
1942                 RETURN(rc);
1943
1944         /* check if layout swapping is allowed */
1945         rc = mdd_layout_swap_allowed(env, fst_o, fst_la, snd_o, snd_la, flags);
1946         if (rc != 0)
1947                 RETURN(rc);
1948
1949         handle = mdd_trans_create(env, mdd);
1950         if (IS_ERR(handle))
1951                 RETURN(PTR_ERR(handle));
1952
1953         /* objects are already sorted */
1954         mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
1955         mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
1956
1957         rc = mdd_get_lov_ea(env, fst_o, fst_buf);
1958         if (rc < 0 && rc != -ENODATA)
1959                 GOTO(stop, rc);
1960
1961         rc = mdd_get_lov_ea(env, snd_o, snd_buf);
1962         if (rc < 0 && rc != -ENODATA)
1963                 GOTO(stop, rc);
1964
1965         /* swapping 2 non existant layouts is a success */
1966         if (fst_buf->lb_buf == NULL && snd_buf->lb_buf == NULL)
1967                 GOTO(stop, rc = 0);
1968
1969         /* to help inode migration between MDT, it is better to
1970          * start by the no layout file (if one), so we order the swap */
1971         if (snd_buf->lb_buf == NULL) {
1972                 swap(fst_o, snd_o);
1973                 swap(fst_buf, snd_buf);
1974         }
1975
1976         fst_gen = snd_gen = 0;
1977         /* lmm and generation layout initialization */
1978         if (fst_buf->lb_buf != NULL) {
1979                 fst_lmm = fst_buf->lb_buf;
1980                 mdd_get_lmm_gen(fst_lmm, &fst_gen);
1981                 fst_fl  = LU_XATTR_REPLACE;
1982         } else {
1983                 fst_lmm = NULL;
1984                 fst_gen = 0;
1985                 fst_fl  = LU_XATTR_CREATE;
1986         }
1987
1988         snd_lmm = snd_buf->lb_buf;
1989         mdd_get_lmm_gen(snd_lmm, &snd_gen);
1990
1991         saved_gen = fst_gen;
1992         /* increase the generation layout numbers */
1993         snd_gen++;
1994         fst_gen++;
1995
1996         /*
1997          * XXX The layout generation is used to generate component IDs for
1998          *     the composite file, we have to do some special tweaks to make
1999          *     sure the layout generation is always adequate for that job.
2000          */
2001
2002         /* Skip invalid generation number for composite layout */
2003         if ((snd_gen & LCME_ID_MASK) == 0)
2004                 snd_gen++;
2005         if ((fst_gen & LCME_ID_MASK) == 0)
2006                 fst_gen++;
2007         /* Make sure the generation is greater than all the component IDs */
2008         if (fst_gen < snd_gen)
2009                 fst_gen = snd_gen;
2010         else if (fst_gen > snd_gen)
2011                 snd_gen = fst_gen;
2012
2013         /* set the file specific informations in lmm */
2014         if (fst_lmm != NULL) {
2015                 saved_oi = &info->mti_oa.o_oi;
2016                 mdd_get_lmm_oi(fst_lmm, saved_oi);
2017                 mdd_set_lmm_gen(fst_lmm, &snd_gen);
2018                 mdd_set_lmm_oi(fst_lmm, &snd_lmm->lmm_oi);
2019                 mdd_set_lmm_oi(snd_lmm, saved_oi);
2020         } else {
2021                 if ((snd_lmm->lmm_magic & cpu_to_le32(LOV_MAGIC_MASK)) ==
2022                     cpu_to_le32(LOV_MAGIC_MAGIC))
2023                         snd_lmm->lmm_magic |= cpu_to_le32(LOV_MAGIC_DEFINED);
2024                 else
2025                         GOTO(stop, rc = -EPROTO);
2026         }
2027         mdd_set_lmm_gen(snd_lmm, &fst_gen);
2028
2029         /* Prepare HSM attribute if it's required */
2030         if (flags & SWAP_LAYOUTS_MDS_HSM) {
2031                 const int buflen = sizeof(struct hsm_attrs);
2032
2033                 lu_buf_alloc(fst_hsm_buf, buflen);
2034                 lu_buf_alloc(snd_hsm_buf, buflen);
2035                 if (fst_hsm_buf->lb_buf == NULL || snd_hsm_buf->lb_buf == NULL)
2036                         GOTO(stop, rc = -ENOMEM);
2037
2038                 /* Read HSM attribute */
2039                 rc = mdo_xattr_get(env, fst_o, fst_hsm_buf, XATTR_NAME_HSM);
2040                 if (rc < 0)
2041                         GOTO(stop, rc);
2042
2043                 rc = mdo_xattr_get(env, snd_o, snd_hsm_buf, XATTR_NAME_HSM);
2044                 if (rc < 0)
2045                         GOTO(stop, rc);
2046
2047                 rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_hsm_buf,
2048                                            XATTR_NAME_HSM, LU_XATTR_REPLACE,
2049                                            handle);
2050                 if (rc < 0)
2051                         GOTO(stop, rc);
2052
2053                 rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_hsm_buf,
2054                                            XATTR_NAME_HSM, LU_XATTR_REPLACE,
2055                                            handle);
2056                 if (rc < 0)
2057                         GOTO(stop, rc);
2058         }
2059
2060         /* prepare transaction */
2061         rc = mdd_declare_xattr_set(env, mdd, fst_o, snd_buf, XATTR_NAME_LOV,
2062                                    fst_fl, handle);
2063         if (rc != 0)
2064                 GOTO(stop, rc);
2065
2066         if (fst_buf->lb_buf != NULL)
2067                 rc = mdd_declare_xattr_set(env, mdd, snd_o, fst_buf,
2068                                            XATTR_NAME_LOV, LU_XATTR_REPLACE,
2069                                            handle);
2070         else
2071                 rc = mdd_declare_xattr_del(env, mdd, snd_o, XATTR_NAME_LOV,
2072                                            handle);
2073         if (rc != 0)
2074                 GOTO(stop, rc);
2075
2076         rc = mdd_trans_start(env, mdd, handle);
2077         if (rc != 0)
2078                 GOTO(stop, rc);
2079
2080         if (flags & SWAP_LAYOUTS_MDS_HSM) {
2081                 rc = mdd_xattr_hsm_replace(env, fst_o, snd_hsm_buf, handle);
2082                 if (rc < 0)
2083                         GOTO(stop, rc);
2084
2085                 rc = mdd_xattr_hsm_replace(env, snd_o, fst_hsm_buf, handle);
2086                 if (rc < 0) {
2087                         rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf,
2088                                                     handle);
2089                         if (rc2 < 0)
2090                                 CERROR("%s: restore "DFID" HSM error: %d/%d\n",
2091                                        mdd_obj_dev_name(fst_o),
2092                                        PFID(mdo2fid(fst_o)), rc, rc2);
2093                         GOTO(stop, rc);
2094                 }
2095         }
2096
2097         rc = mdo_xattr_set(env, fst_o, snd_buf, XATTR_NAME_LOV, fst_fl, handle);
2098         if (rc != 0)
2099                 GOTO(stop, rc);
2100
2101         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_MDS_HSM_SWAP_LAYOUTS))) {
2102                 rc = -EOPNOTSUPP;
2103         } else {
2104                 if (fst_buf->lb_buf != NULL)
2105                         rc = mdo_xattr_set(env, snd_o, fst_buf, XATTR_NAME_LOV,
2106                                            LU_XATTR_REPLACE, handle);
2107                 else
2108                         rc = mdo_xattr_del(env, snd_o, XATTR_NAME_LOV, handle);
2109         }
2110         if (rc != 0)
2111                 GOTO(out_restore, rc);
2112
2113         /* Issue one changelog record per file */
2114         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, fst_o, handle);
2115         if (rc)
2116                 GOTO(stop, rc);
2117
2118         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, snd_o, handle);
2119         if (rc)
2120                 GOTO(stop, rc);
2121         EXIT;
2122
2123 out_restore:
2124         if (rc != 0) {
2125                 int steps = 0;
2126
2127                 /* failure on second file, but first was done, so we have
2128                  * to roll back first. */
2129                 if (fst_buf->lb_buf != NULL) {
2130                         mdd_set_lmm_oi(fst_lmm, saved_oi);
2131                         mdd_set_lmm_gen(fst_lmm, &saved_gen);
2132                         rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV,
2133                                             LU_XATTR_REPLACE, handle);
2134                 } else {
2135                         rc2 = mdo_xattr_del(env, fst_o, XATTR_NAME_LOV, handle);
2136                 }
2137                 if (rc2 < 0)
2138                         goto do_lbug;
2139
2140                 ++steps;
2141                 rc2 = mdd_xattr_hsm_replace(env, fst_o, fst_hsm_buf, handle);
2142                 if (rc2 < 0)
2143                         goto do_lbug;
2144
2145                 ++steps;
2146                 rc2 = mdd_xattr_hsm_replace(env, snd_o, snd_hsm_buf, handle);
2147
2148         do_lbug:
2149                 if (rc2 < 0) {
2150                         /* very bad day */
2151                         CERROR("%s: unable to roll back layout swap. FIDs: "
2152                                DFID" and "DFID "error: %d/%d, steps: %d\n",
2153                                mdd_obj_dev_name(fst_o),
2154                                PFID(mdo2fid(snd_o)), PFID(mdo2fid(fst_o)),
2155                                rc, rc2, steps);
2156                         /* a solution to avoid journal commit is to panic,
2157                          * but it has strong consequences so we use LBUG to
2158                          * allow sysdamin to choose to panic or not
2159                          */
2160                         LBUG();
2161                 }
2162         }
2163
2164 stop:
2165         rc = mdd_trans_stop(env, mdd, rc, handle);
2166
2167         mdd_write_unlock(env, snd_o);
2168         mdd_write_unlock(env, fst_o);
2169
2170         lu_buf_free(fst_buf);
2171         lu_buf_free(snd_buf);
2172         lu_buf_free(fst_hsm_buf);
2173         lu_buf_free(snd_hsm_buf);
2174
2175         if (!rc) {
2176                 (void) mdd_object_pfid_replace(env, fst_o);
2177                 (void) mdd_object_pfid_replace(env, snd_o);
2178         }
2179         return rc;
2180 }
2181
2182 static int mdd_declare_layout_change(const struct lu_env *env,
2183                                      struct mdd_device *mdd,
2184                                      struct mdd_object *obj,
2185                                      struct md_layout_change *mlc,
2186                                      struct thandle *handle)
2187 {
2188         int rc;
2189
2190         rc = mdo_declare_layout_change(env, obj, mlc, handle);
2191         if (rc)
2192                 return rc;
2193
2194         return mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
2195 }
2196
2197 /* For PFL, this is used to instantiate necessary component objects. */
2198 static int
2199 mdd_layout_instantiate_component(const struct lu_env *env,
2200                 struct mdd_object *obj, struct md_layout_change *mlc,
2201                 struct thandle *handle)
2202 {
2203         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2204         int rc;
2205         ENTRY;
2206
2207         if (mlc->mlc_opc != MD_LAYOUT_WRITE)
2208                 RETURN(-ENOTSUPP);
2209
2210         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2211         /**
2212          * It's possible that another layout write intent has already
2213          * instantiated our objects, so a -EALREADY returned, and we need to
2214          * do nothing.
2215          */
2216         if (rc)
2217                 RETURN(rc == -EALREADY ? 0 : rc);
2218
2219         rc = mdd_trans_start(env, mdd, handle);
2220         if (rc)
2221                 RETURN(rc);
2222
2223         mdd_write_lock(env, obj, MOR_TGT_CHILD);
2224         rc = mdo_layout_change(env, obj, mlc, handle);
2225         mdd_write_unlock(env, obj);
2226         if (rc)
2227                 RETURN(rc);
2228
2229         rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
2230         RETURN(rc);
2231 }
2232
2233 /**
2234  * Change the FLR layout from RDONLY to WRITE_PENDING.
2235  *
2236  * It picks the primary mirror, and bumps the layout version, and set
2237  * layout version xattr to OST objects in a sync tx. In order to facilitate
2238  * the handling of phantom writers from evicted clients, the clients carry
2239  * layout version of the file with write RPC, so that the OSTs can verify
2240  * if the write RPCs are legitimate, meaning not from evicted clients.
2241  */
2242 static int
2243 mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
2244                          struct md_layout_change *mlc, struct thandle *handle)
2245 {
2246         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2247         int rc;
2248         ENTRY;
2249
2250         /* Verify acceptable operations */
2251         switch (mlc->mlc_opc) {
2252         case MD_LAYOUT_WRITE:
2253                 break;
2254         case MD_LAYOUT_RESYNC:
2255                 /* these are legal operations - this represents the case that
2256                  * a few mirrors were missed in the last resync.
2257                  * XXX: it will be supported later */
2258         case MD_LAYOUT_RESYNC_DONE:
2259         default:
2260                 RETURN(0);
2261         }
2262
2263         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2264         if (rc)
2265                 GOTO(out, rc);
2266
2267         rc = mdd_declare_xattr_del(env, mdd, obj, XATTR_NAME_SOM, handle);
2268         if (rc)
2269                 GOTO(out, rc);
2270
2271         /* record a changelog for data mover to consume */
2272         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
2273         if (rc)
2274                 GOTO(out, rc);
2275
2276         rc = mdd_trans_start(env, mdd, handle);
2277         if (rc)
2278                 GOTO(out, rc);
2279
2280         /* it needs a sync tx to make FLR to work properly */
2281         handle->th_sync = 1;
2282
2283         mdd_write_lock(env, obj, MOR_TGT_CHILD);
2284         rc = mdo_layout_change(env, obj, mlc, handle);
2285         if (!rc) {
2286                 rc = mdo_xattr_del(env, obj, XATTR_NAME_SOM, handle);
2287                 if (rc == -ENODATA)
2288                         rc = 0;
2289         }
2290         mdd_write_unlock(env, obj);
2291         if (rc)
2292                 GOTO(out, rc);
2293
2294         rc = mdd_changelog_data_store(env, mdd, CL_FLRW, 0, obj, handle);
2295         if (rc)
2296                 GOTO(out, rc);
2297
2298         EXIT;
2299
2300 out:
2301         return rc;
2302 }
2303
2304 /**
2305  * Handle mirrored file state transition when it's in WRITE_PENDING.
2306  *
2307  * Only MD_LAYOUT_RESYNC, which represents start of resync, is allowed when
2308  * the file is in WRITE_PENDING state. If everything goes fine, the file's
2309  * layout version will be increased, and the file's state will be changed to
2310  * SYNC_PENDING.
2311  */
2312 static int
2313 mdd_layout_update_write_pending(const struct lu_env *env,
2314                 struct mdd_object *obj, struct md_layout_change *mlc,
2315                 struct thandle *handle)
2316 {
2317         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2318         int rc;
2319         ENTRY;
2320
2321         switch (mlc->mlc_opc) {
2322         case MD_LAYOUT_RESYNC:
2323                 /* Upon receiving the resync request, it should
2324                  * instantiate all stale components right away to get ready
2325                  * for mirror copy. In order to avoid layout version change,
2326                  * client should avoid sending LAYOUT_WRITE request at the
2327                  * resync state. */
2328                 break;
2329         case MD_LAYOUT_WRITE:
2330                 /* legal race for concurrent write, the file state has been
2331                  * changed by another client. */
2332                 break;
2333         default:
2334                 RETURN(-EBUSY);
2335         }
2336
2337         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2338         if (rc)
2339                 GOTO(out, rc);
2340
2341         rc = mdd_trans_start(env, mdd, handle);
2342         if (rc)
2343                 GOTO(out, rc);
2344
2345         /* it needs a sync tx to make FLR to work properly */
2346         handle->th_sync = 1;
2347
2348         mdd_write_lock(env, obj, MOR_TGT_CHILD);
2349         rc = mdo_layout_change(env, obj, mlc, handle);
2350         mdd_write_unlock(env, obj);
2351         if (rc)
2352                 GOTO(out, rc);
2353
2354         EXIT;
2355
2356 out:
2357         return rc;
2358 }
2359
2360 /**
2361  * Handle the requests when a FLR file's state is in SYNC_PENDING.
2362  *
2363  * Only concurrent write and sync complete requests are possible when the
2364  * file is in SYNC_PENDING. For the latter request, it will pass in the
2365  * mirrors that have been synchronized, then the stale bit will be cleared
2366  * to make the file's state turn into RDONLY.
2367  * For concurrent write reqeust, it just needs to change the file's state
2368  * to WRITE_PENDING in a sync tx. It doesn't have to change the layout
2369  * version because the version will be increased in the transition to
2370  * SYNC_PENDING later so that it can deny the write request from potential
2371  * evicted SYNC clients. */
2372 static int
2373 mdd_object_update_sync_pending(const struct lu_env *env, struct mdd_object *obj,
2374                 struct md_layout_change *mlc, struct thandle *handle)
2375 {
2376         struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
2377         struct lu_buf *som_buf = &mdd_env_info(env)->mti_buf[1];
2378         int fl = 0;
2379         int rc;
2380         ENTRY;
2381
2382         /* operation validation */
2383         switch (mlc->mlc_opc) {
2384         case MD_LAYOUT_RESYNC_DONE:
2385                 /* resync complete. */
2386         case MD_LAYOUT_WRITE:
2387                 /* concurrent write. */
2388                 break;
2389         case MD_LAYOUT_RESYNC:
2390                 /* resync again, most likely the previous run failed.
2391                  * no-op if it's already in SYNC_PENDING state */
2392                 RETURN(0);
2393         default:
2394                 RETURN(-EBUSY);
2395         }
2396
2397         if (mlc->mlc_som.lsa_valid & LSOM_FL_VALID) {
2398                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_SOM);
2399                 if (rc && rc != -ENODATA)
2400                         RETURN(rc);
2401
2402                 fl = rc == -ENODATA ? LU_XATTR_CREATE : LU_XATTR_REPLACE;
2403                 som_buf->lb_buf = &mlc->mlc_som;
2404                 som_buf->lb_len = sizeof(mlc->mlc_som);
2405         }
2406
2407         rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
2408         if (rc)
2409                 GOTO(out, rc);
2410
2411         /* record a changelog for the completion of resync */
2412         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
2413         if (rc)
2414                 GOTO(out, rc);
2415
2416         /* RESYNC_DONE has piggybacked size and blocks */
2417         if (fl) {
2418                 rc = mdd_declare_xattr_set(env, mdd, obj, som_buf,
2419                                            XATTR_NAME_SOM, fl, handle);
2420                 if (rc)
2421                         GOTO(out, rc);
2422         }
2423
2424         rc = mdd_trans_start(env, mdd, handle);
2425         if (rc)
2426                 GOTO(out, rc);
2427
2428         /* it needs a sync tx to make FLR to work properly */
2429         handle->th_sync = 1;
2430
2431         rc = mdo_layout_change(env, obj, mlc, handle);
2432         if (rc)
2433                 GOTO(out, rc);
2434
2435         if (fl) {
2436                 rc = mdo_xattr_set(env, obj, som_buf, XATTR_NAME_SOM,
2437                                    fl, handle);
2438                 if (rc)
2439                         GOTO(out, rc);
2440         }
2441
2442         rc = mdd_changelog_data_store(env, mdd, CL_RESYNC, 0, obj, handle);
2443         if (rc)
2444                 GOTO(out, rc);
2445         EXIT;
2446 out:
2447         return rc;
2448 }
2449
2450 /**
2451  * Layout change callback for object.
2452  *
2453  * This is only used by FLR for now. In the future, it can be exteneded to
2454  * handle all layout change.
2455  */
2456 static int
2457 mdd_layout_change(const struct lu_env *env, struct md_object *o,
2458                   struct md_layout_change *mlc)
2459 {
2460         struct mdd_object       *obj = md2mdd_obj(o);
2461         struct mdd_device       *mdd = mdd_obj2mdd_dev(obj);
2462         struct lu_buf           *buf = mdd_buf_get(env, NULL, 0);
2463         struct lov_comp_md_v1   *lcm;
2464         struct thandle          *handle;
2465         int flr_state;
2466         int rc;
2467         ENTRY;
2468
2469         /* Verify acceptable operations */
2470         switch (mlc->mlc_opc) {
2471         case MD_LAYOUT_WRITE:
2472         case MD_LAYOUT_RESYNC:
2473         case MD_LAYOUT_RESYNC_DONE:
2474                 break;
2475         default:
2476                 RETURN(-ENOTSUPP);
2477         }
2478
2479         handle = mdd_trans_create(env, mdd);
2480         if (IS_ERR(handle))
2481                 RETURN(PTR_ERR(handle));
2482
2483         rc = mdd_get_lov_ea(env, obj, buf);
2484         if (rc < 0) {
2485                 if (rc == -ENODATA)
2486                         rc = -EINVAL;
2487                 GOTO(out, rc);
2488         }
2489
2490         /* analyze the layout to make sure it's a FLR file */
2491         lcm = buf->lb_buf;
2492         if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
2493                 GOTO(out, rc = -EINVAL);
2494
2495         flr_state = le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK;
2496
2497         /* please refer to HLD of FLR for state transition */
2498         switch (flr_state) {
2499         case LCM_FL_NOT_FLR:
2500                 rc = mdd_layout_instantiate_component(env, obj, mlc, handle);
2501                 break;
2502         case LCM_FL_WRITE_PENDING:
2503                 rc = mdd_layout_update_write_pending(env, obj, mlc, handle);
2504                 break;
2505         case LCM_FL_RDONLY:
2506                 rc = mdd_layout_update_rdonly(env, obj, mlc, handle);
2507                 break;
2508         case LCM_FL_SYNC_PENDING:
2509                 rc = mdd_object_update_sync_pending(env, obj, mlc, handle);
2510                 break;
2511         default:
2512                 rc = 0;
2513                 break;
2514         }
2515         EXIT;
2516
2517 out:
2518         mdd_trans_stop(env, mdd, rc, handle);
2519         lu_buf_free(buf);
2520         return rc;
2521 }
2522
2523 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
2524                           struct mdd_object *child, const struct lu_attr *attr,
2525                           const struct md_op_spec *spec,
2526                           struct dt_allocation_hint *hint)
2527 {
2528         struct dt_object *np = parent ?  mdd_object_child(parent) : NULL;
2529         struct dt_object *nc = mdd_object_child(child);
2530
2531         memset(hint, 0, sizeof(*hint));
2532
2533         /* For striped directory, give striping EA to lod_ah_init, which will
2534          * decide the stripe_offset and stripe count by it. */
2535         if (S_ISDIR(attr->la_mode) &&
2536             unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
2537                 hint->dah_eadata = spec->u.sp_ea.eadata;
2538                 hint->dah_eadata_len = spec->u.sp_ea.eadatalen;
2539         } else {
2540                 hint->dah_eadata = NULL;
2541                 hint->dah_eadata_len = 0;
2542         }
2543
2544         CDEBUG(D_INFO, DFID" eadata %p len %d\n", PFID(mdd_object_fid(child)),
2545                hint->dah_eadata, hint->dah_eadata_len);
2546         /* @hint will be initialized by underlying device. */
2547         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
2548 }
2549
2550 /*
2551  * do NOT or the MAY_*'s, you'll get the weakest
2552  */
2553 int accmode(const struct lu_env *env, const struct lu_attr *la, int flags)
2554 {
2555         int res = 0;
2556
2557         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2558          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2559          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2560          * owner can write to a file even if it is marked readonly to hide
2561          * its brokenness. (bug 5781) */
2562         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2563                 struct lu_ucred *uc = lu_ucred_check(env);
2564
2565                 if ((uc == NULL) || (la->la_uid == uc->uc_fsuid))
2566                         return 0;
2567         }
2568
2569         if (flags & FMODE_READ)
2570                 res |= MAY_READ;
2571         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2572                 res |= MAY_WRITE;
2573         if (flags & MDS_FMODE_EXEC)
2574                 res = MAY_EXEC;
2575         return res;
2576 }
2577
2578 static int mdd_open_sanity_check(const struct lu_env *env,
2579                                 struct mdd_object *obj,
2580                                 const struct lu_attr *attr, int flag)
2581 {
2582         int mode, rc;
2583         ENTRY;
2584
2585         /* EEXIST check */
2586         if (mdd_is_dead_obj(obj))
2587                 RETURN(-ENOENT);
2588
2589         if (S_ISLNK(attr->la_mode))
2590                 RETURN(-ELOOP);
2591
2592         mode = accmode(env, attr, flag);
2593
2594         if (S_ISDIR(attr->la_mode) && (mode & MAY_WRITE))
2595                 RETURN(-EISDIR);
2596
2597         if (!(flag & MDS_OPEN_CREATED)) {
2598                 rc = mdd_permission_internal(env, obj, attr, mode);
2599                 if (rc)
2600                         RETURN(rc);
2601         }
2602
2603         if (S_ISFIFO(attr->la_mode) || S_ISSOCK(attr->la_mode) ||
2604             S_ISBLK(attr->la_mode) || S_ISCHR(attr->la_mode))
2605                 flag &= ~MDS_OPEN_TRUNC;
2606
2607         /* For writing append-only file must open it with append mode. */
2608         if (attr->la_flags & LUSTRE_APPEND_FL) {
2609                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2610                         RETURN(-EPERM);
2611                 if (flag & MDS_OPEN_TRUNC)
2612                         RETURN(-EPERM);
2613         }
2614
2615         RETURN(0);
2616 }
2617
2618 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2619                     int flags)
2620 {
2621         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2622         struct md_device *md_dev = lu2md_dev(mdd2lu_dev(mdo2mdd(obj)));
2623         struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
2624         int rc = 0;
2625
2626         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2627
2628         rc = mdd_la_get(env, mdd_obj, attr);
2629         if (rc != 0)
2630                 GOTO(out, rc);
2631
2632         rc = mdd_open_sanity_check(env, mdd_obj, attr, flags);
2633         if (rc != 0)
2634                 GOTO(out, rc);
2635
2636         mdd_obj->mod_count++;
2637
2638         mdd_changelog(env, CL_OPEN, flags, md_dev, mdo2fid(mdd_obj));
2639
2640         EXIT;
2641 out:
2642         mdd_write_unlock(env, mdd_obj);
2643         return rc;
2644 }
2645
2646 static int mdd_declare_close(const struct lu_env *env,
2647                              struct mdd_object *obj,
2648                              struct md_attr *ma,
2649                              struct thandle *handle)
2650 {
2651         int rc;
2652
2653         rc = orph_declare_index_delete(env, obj, handle);
2654         if (rc)
2655                 return rc;
2656
2657         return mdo_declare_destroy(env, obj, handle);
2658 }
2659
2660 /*
2661  * No permission check is needed.
2662  */
2663 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2664                      struct md_attr *ma, int mode)
2665 {
2666         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2667         struct mdd_device *mdd = mdo2mdd(obj);
2668         struct thandle *handle = NULL;
2669         int is_orphan = 0;
2670         int rc;
2671         bool blocked = false;
2672         ENTRY;
2673
2674         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2675                 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2676                 mdd_obj->mod_count--;
2677                 mdd_write_unlock(env, mdd_obj);
2678
2679                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2680                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2681                                 "list\n", PFID(mdd_object_fid(mdd_obj)));
2682                 RETURN(0);
2683         }
2684
2685         /* mdd_finish_unlink() will always set orphan object as DEAD_OBJ, but
2686          * it might fail to add the object to orphan list (w/o ORPHAN_OBJ). */
2687         /* check without any lock */
2688         is_orphan = mdd_obj->mod_count == 1 &&
2689                     (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0;
2690
2691 again:
2692         if (is_orphan) {
2693                 /* mdd_trans_create() maybe failed because of barrier_entry(),
2694                  * under such case, the orphan MDT-object will be left in the
2695                  * orphan list, and when the MDT remount next time, the unused
2696                  * orphans will be destroyed automatically.
2697                  *
2698                  * One exception: the former mdd_finish_unlink may failed to
2699                  * add the orphan MDT-object to the orphan list, then if the
2700                  * mdd_trans_create() failed because of barrier_entry(), the
2701                  * MDT-object will become real orphan that is neither in the
2702                  * namespace nor in the orphan list. Such bad case should be
2703                  * very rare and will be handled by e2fsck/lfsck. */
2704                 handle = mdd_trans_create(env, mdo2mdd(obj));
2705                 if (IS_ERR(handle)) {
2706                         rc = PTR_ERR(handle);
2707                         if (rc != -EINPROGRESS)
2708                                 GOTO(stop, rc);
2709
2710                         handle = NULL;
2711                         blocked = true;
2712                         goto cont;
2713                 }
2714
2715                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2716                 if (rc)
2717                         GOTO(stop, rc);
2718
2719                 rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
2720                 if (rc)
2721                         GOTO(stop, rc);
2722
2723                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2724                 if (rc)
2725                         GOTO(stop, rc);
2726         }
2727
2728 cont:
2729         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2730         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
2731         if (rc != 0) {
2732                 CERROR("Failed to get lu_attr of "DFID": %d\n",
2733                        PFID(mdd_object_fid(mdd_obj)), rc);
2734                 GOTO(out, rc);
2735         }
2736
2737         /* check again with lock */
2738         is_orphan = (mdd_obj->mod_count == 1) &&
2739                     ((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
2740                      ma->ma_attr.la_nlink == 0);
2741
2742         if (is_orphan && !handle && !blocked) {
2743                 mdd_write_unlock(env, mdd_obj);
2744                 goto again;
2745         }
2746
2747         mdd_obj->mod_count--; /*release open count */
2748
2749         if (!is_orphan || blocked)
2750                 GOTO(out, rc = 0);
2751
2752         /* Orphan object */
2753         /* NB: Object maybe not in orphan list originally, it is rare case for
2754          * mdd_finish_unlink() failure, in that case, the object doesn't have
2755          * ORPHAN_OBJ flag */
2756         if ((mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2757                 /* remove link to object from orphan index */
2758                 LASSERT(handle != NULL);
2759                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2760                 if (rc != 0) {
2761                         CERROR("%s: unable to delete "DFID" from orphan list: "
2762                                "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
2763                                PFID(mdd_object_fid(mdd_obj)), rc);
2764                         /* If object was not deleted from orphan list, do not
2765                          * destroy OSS objects, which will be done when next
2766                          * recovery. */
2767                         GOTO(out, rc);
2768                 }
2769
2770                 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2771                        "list, OSS objects to be destroyed.\n",
2772                        PFID(mdd_object_fid(mdd_obj)));
2773         }
2774
2775         rc = mdo_destroy(env, mdd_obj, handle);
2776
2777         if (rc != 0) {
2778                 CERROR("%s: unable to delete "DFID" from orphan list: "
2779                        "rc = %d\n", lu_dev_name(mdd2lu_dev(mdd)),
2780                        PFID(mdd_object_fid(mdd_obj)), rc);
2781         }
2782         EXIT;
2783
2784 out:
2785         mdd_write_unlock(env, mdd_obj);
2786
2787         /* Record CL_CLOSE in changelog only if file was opened in write mode,
2788          * or if CL_OPEN was recorded.
2789          * Changelogs mask may change between open and close operations, but
2790          * this is not a big deal if we have a CL_CLOSE entry with no matching
2791          * CL_OPEN. Plus Changelogs mask may not change often.
2792          */
2793         if (!rc && !blocked &&
2794             ((mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) ||
2795              (mdd->mdd_cl.mc_mask & (1 << CL_OPEN))) &&
2796             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2797                 if (handle == NULL) {
2798                         handle = mdd_trans_create(env, mdo2mdd(obj));
2799                         if (IS_ERR(handle))
2800                                 GOTO(stop, rc = PTR_ERR(handle));
2801
2802                         rc = mdd_declare_changelog_store(env, mdd, NULL, NULL,
2803                                                          handle);
2804                         if (rc)
2805                                 GOTO(stop, rc);
2806
2807                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2808                         if (rc)
2809                                 GOTO(stop, rc);
2810                 }
2811
2812                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2813                                          mdd_obj, handle);
2814         }
2815
2816 stop:
2817         if (handle != NULL && !IS_ERR(handle))
2818                 rc = mdd_trans_stop(env, mdd, rc, handle);
2819
2820         return rc;
2821 }
2822
2823 /*
2824  * Permission check is done when open,
2825  * no need check again.
2826  */
2827 static int mdd_readpage_sanity_check(const struct lu_env *env,
2828                                      struct mdd_object *obj)
2829 {
2830         struct dt_object *next = mdd_object_child(obj);
2831         int rc;
2832         ENTRY;
2833
2834         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2835                 rc = 0;
2836         else
2837                 rc = -ENOTDIR;
2838
2839         RETURN(rc);
2840 }
2841
2842 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2843                               size_t nob, const struct dt_it_ops *iops,
2844                               struct dt_it *it, __u32 attr, void *arg)
2845 {
2846         struct lu_dirpage       *dp = &lp->lp_dir;
2847         void                    *area = dp;
2848         int                      result;
2849         __u64                    hash = 0;
2850         struct lu_dirent        *ent;
2851         struct lu_dirent        *last = NULL;
2852         struct lu_fid            fid;
2853         int                      first = 1;
2854
2855         if (nob < sizeof(*dp))
2856                 return -EINVAL;
2857
2858         memset(area, 0, sizeof (*dp));
2859         area += sizeof (*dp);
2860         nob  -= sizeof (*dp);
2861
2862         ent  = area;
2863         do {
2864                 int    len;
2865                 size_t recsize;
2866
2867                 len = iops->key_size(env, it);
2868
2869                 /* IAM iterator can return record with zero len. */
2870                 if (len == 0)
2871                         goto next;
2872
2873                 hash = iops->store(env, it);
2874                 if (unlikely(first)) {
2875                         first = 0;
2876                         dp->ldp_hash_start = cpu_to_le64(hash);
2877                 }
2878
2879                 /* calculate max space required for lu_dirent */
2880                 recsize = lu_dirent_calc_size(len, attr);
2881
2882                 if (nob >= recsize) {
2883                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2884                         if (result == -ESTALE)
2885                                 goto next;
2886                         if (result != 0)
2887                                 goto out;
2888
2889                         /* osd might not able to pack all attributes,
2890                          * so recheck rec length */
2891                         recsize = le16_to_cpu(ent->lde_reclen);
2892
2893                         if (le32_to_cpu(ent->lde_attrs) & LUDA_FID) {
2894                                 fid_le_to_cpu(&fid, &ent->lde_fid);
2895                                 if (fid_is_dot_lustre(&fid))
2896                                         goto next;
2897                         }
2898                 } else {
2899                         result = (last != NULL) ? 0 :-EINVAL;
2900                         goto out;
2901                 }
2902                 last = ent;
2903                 ent = (void *)ent + recsize;
2904                 nob -= recsize;
2905
2906 next:
2907                 result = iops->next(env, it);
2908                 if (result == -ESTALE)
2909                         goto next;
2910         } while (result == 0);
2911
2912 out:
2913         dp->ldp_hash_end = cpu_to_le64(hash);
2914         if (last != NULL) {
2915                 if (last->lde_hash == dp->ldp_hash_end)
2916                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2917                 last->lde_reclen = 0; /* end mark */
2918         }
2919         if (result > 0)
2920                 /* end of directory */
2921                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2922         else if (result < 0)
2923                 CWARN("build page failed: %d!\n", result);
2924         return result;
2925 }
2926
2927 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2928                  const struct lu_rdpg *rdpg)
2929 {
2930         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2931         int rc;
2932         ENTRY;
2933
2934         if (mdd_object_exists(mdd_obj) == 0) {
2935                 CERROR("%s: object "DFID" not found: rc = -2\n",
2936                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2937                 return -ENOENT;
2938         }
2939
2940         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2941         rc = mdd_readpage_sanity_check(env, mdd_obj);
2942         if (rc)
2943                 GOTO(out_unlock, rc);
2944
2945         if (mdd_is_dead_obj(mdd_obj)) {
2946                 struct page *pg;
2947                 struct lu_dirpage *dp;
2948
2949                 /*
2950                  * According to POSIX, please do not return any entry to client:
2951                  * even dot and dotdot should not be returned.
2952                  */
2953                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2954                        PFID(mdd_object_fid(mdd_obj)));
2955
2956                 if (rdpg->rp_count <= 0)
2957                         GOTO(out_unlock, rc = -EFAULT);
2958                 LASSERT(rdpg->rp_pages != NULL);
2959
2960                 pg = rdpg->rp_pages[0];
2961                 dp = (struct lu_dirpage *)kmap(pg);
2962                 memset(dp, 0 , sizeof(struct lu_dirpage));
2963                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2964                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2965                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2966                 kunmap(pg);
2967                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2968         }
2969
2970         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2971                            mdd_dir_page_build, NULL);
2972         if (rc >= 0) {
2973                 struct lu_dirpage       *dp;
2974
2975                 dp = kmap(rdpg->rp_pages[0]);
2976                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2977                 if (rc == 0) {
2978                         /*
2979                          * No pages were processed, mark this for first page
2980                          * and send back.
2981                          */
2982                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2983                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2984                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2985                 }
2986                 kunmap(rdpg->rp_pages[0]);
2987         }
2988
2989         GOTO(out_unlock, rc);
2990 out_unlock:
2991         mdd_read_unlock(env, mdd_obj);
2992         return rc;
2993 }
2994
2995 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2996 {
2997         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2998
2999         if (mdd_object_exists(mdd_obj) == 0) {
3000                 int rc = -ENOENT;
3001
3002                 CERROR("%s: object "DFID" not found: rc = %d\n",
3003                        mdd_obj_dev_name(mdd_obj),
3004                        PFID(mdd_object_fid(mdd_obj)), rc);
3005                 return rc;
3006         }
3007         return dt_object_sync(env, mdd_object_child(mdd_obj),
3008                               0, OBD_OBJECT_EOF);
3009 }
3010
3011 static int mdd_object_lock(const struct lu_env *env,
3012                            struct md_object *obj,
3013                            struct lustre_handle *lh,
3014                            struct ldlm_enqueue_info *einfo,
3015                            union ldlm_policy_data *policy)
3016 {
3017         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3018         return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
3019                               einfo, policy);
3020 }
3021
3022 static int mdd_object_unlock(const struct lu_env *env,
3023                              struct md_object *obj,
3024                              struct ldlm_enqueue_info *einfo,
3025                              union ldlm_policy_data *policy)
3026 {
3027         struct mdd_object *mdd_obj = md2mdd_obj(obj);
3028         return dt_object_unlock(env, mdd_object_child(mdd_obj), einfo, policy);
3029 }
3030
3031 const struct md_object_operations mdd_obj_ops = {
3032         .moo_permission         = mdd_permission,
3033         .moo_attr_get           = mdd_attr_get,
3034         .moo_attr_set           = mdd_attr_set,
3035         .moo_xattr_get          = mdd_xattr_get,
3036         .moo_xattr_set          = mdd_xattr_set,
3037         .moo_xattr_list         = mdd_xattr_list,
3038         .moo_invalidate         = mdd_invalidate,
3039         .moo_xattr_del          = mdd_xattr_del,
3040         .moo_swap_layouts       = mdd_swap_layouts,
3041         .moo_open               = mdd_open,
3042         .moo_close              = mdd_close,
3043         .moo_readpage           = mdd_readpage,
3044         .moo_readlink           = mdd_readlink,
3045         .moo_changelog          = mdd_changelog,
3046         .moo_object_sync        = mdd_object_sync,
3047         .moo_object_lock        = mdd_object_lock,
3048         .moo_object_unlock      = mdd_object_unlock,
3049         .moo_layout_change      = mdd_layout_change,
3050 };