Whamcloud - gitweb
b=17491
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: wangdi <wangdi@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <obd_lov.h>
55 #include <lprocfs_status.h>
56 #include <lustre_mds.h>
57 #include <lustre_fid.h>
58 #include <lustre/lustre_idl.h>
59
60 #include "mdd_internal.h"
61
62 static int mdd_notify(struct obd_device *host, struct obd_device *watched,
63                       enum obd_notify_event ev, void *owner)
64 {
65         struct mdd_device *mdd = owner;
66         int rc = 0;
67         ENTRY;
68
69         LASSERT(owner != NULL);
70         switch (ev)
71         {
72                 case OBD_NOTIFY_ACTIVE:
73                 case OBD_NOTIFY_SYNC:
74                 case OBD_NOTIFY_SYNC_NONBLOCK:
75                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_SYNC);
76                         break;
77                 case OBD_NOTIFY_CONFIG:
78                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_CONFIG);
79                         break;
80 #ifdef HAVE_QUOTA_SUPPORT
81                 case OBD_NOTIFY_QUOTA:
82                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_QUOTA);
83                         break;
84 #endif
85                 default:
86                         CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
87         }
88
89         RETURN(rc);
90 }
91
92 /* The obd is created for handling data stack for mdd */
93 int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
94                  struct lustre_cfg *cfg)
95 {
96         char                   *dev = lustre_cfg_string(cfg, 0);
97         int                     rc, name_size, uuid_size;
98         char                   *name, *uuid;
99         __u32                   mds_id;
100         struct lustre_cfg_bufs *bufs;
101         struct lustre_cfg      *lcfg;
102         struct obd_device      *obd;
103         ENTRY;
104
105         mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
106         name_size = strlen(MDD_OBD_NAME) + 35;
107         uuid_size = strlen(MDD_OBD_UUID) + 35;
108
109         OBD_ALLOC(name, name_size);
110         OBD_ALLOC(uuid, uuid_size);
111         if (name == NULL || uuid == NULL)
112                 GOTO(cleanup_mem, rc = -ENOMEM);
113
114         OBD_ALLOC_PTR(bufs);
115         if (!bufs)
116                 GOTO(cleanup_mem, rc = -ENOMEM);
117
118         snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s-%d",
119                  MDD_OBD_NAME, dev, mds_id);
120
121         snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s-%d",
122                  MDD_OBD_UUID, dev, mds_id);
123
124         lustre_cfg_bufs_reset(bufs, name);
125         lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE);
126         lustre_cfg_bufs_set_string(bufs, 2, uuid);
127         lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */);
128         lustre_cfg_bufs_set_string(bufs, 4, (char*)dev);
129
130         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
131         OBD_FREE_PTR(bufs);
132         if (!lcfg)
133                 GOTO(cleanup_mem, rc = -ENOMEM);
134
135         rc = class_attach(lcfg);
136         if (rc)
137                 GOTO(lcfg_cleanup, rc);
138
139         obd = class_name2obd(name);
140         if (!obd) {
141                 CERROR("Can not find obd %s\n", MDD_OBD_NAME);
142                 LBUG();
143         }
144
145         obd->obd_recovering = 1;
146         obd->u.mds.mds_id = mds_id;
147         rc = class_setup(obd, lcfg);
148         if (rc)
149                 GOTO(class_detach, rc);
150
151         /*
152          * Add here for obd notify mechanism, when adding a new ost, the mds
153          * will notify this mdd. The mds will be used for quota also.
154          */
155         obd->obd_upcall.onu_upcall = mdd_notify;
156         obd->obd_upcall.onu_owner = mdd;
157         mdd->mdd_obd_dev = obd;
158         EXIT;
159 class_detach:
160         if (rc)
161                 class_detach(obd, lcfg);
162 lcfg_cleanup:
163         lustre_cfg_free(lcfg);
164 cleanup_mem:
165         if (name)
166                 OBD_FREE(name, name_size);
167         if (uuid)
168                 OBD_FREE(uuid, uuid_size);
169         return rc;
170 }
171
172 int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd,
173                  struct lustre_cfg *lcfg)
174 {
175         struct obd_device      *obd;
176         int rc;
177         ENTRY;
178
179         obd = mdd2obd_dev(mdd);
180         LASSERT(obd);
181
182         rc = class_cleanup(obd, lcfg);
183         if (rc)
184                 GOTO(lcfg_cleanup, rc);
185
186         obd->obd_upcall.onu_upcall = NULL;
187         obd->obd_upcall.onu_owner = NULL;
188         rc = class_detach(obd, lcfg);
189         if (rc)
190                 GOTO(lcfg_cleanup, rc);
191         mdd->mdd_obd_dev = NULL;
192
193         EXIT;
194 lcfg_cleanup:
195         return rc;
196 }
197
198 int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
199                void *md, int *md_size, const char *name)
200 {
201         int rc;
202         ENTRY;
203
204         rc = mdo_xattr_get(env, obj, mdd_buf_get(env, md, *md_size), name,
205                            mdd_object_capa(env, obj));
206         /*
207          * XXX: Handling of -ENODATA, the right way is to have ->do_md_get()
208          * exported by dt layer.
209          */
210         if (rc == 0 || rc == -ENODATA) {
211                 *md_size = 0;
212                 rc = 0;
213         } else if (rc < 0) {
214                 CERROR("Error %d reading eadata - %d\n", rc, *md_size);
215         } else {
216                 /* XXX: Convert lov EA but fixed after verification test. */
217                 *md_size = rc;
218         }
219
220         RETURN(rc);
221 }
222
223 int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
224                       void *md, int *md_size, const char *name)
225 {
226         int rc = 0;
227         mdd_read_lock(env, obj, MOR_TGT_CHILD);
228         rc = mdd_get_md(env, obj, md, md_size, name);
229         mdd_read_unlock(env, obj);
230         return rc;
231 }
232
233 static int mdd_lov_set_stripe_md(const struct lu_env *env,
234                                  struct mdd_object *obj, struct lu_buf *buf,
235                                  struct thandle *handle)
236 {
237         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
238         struct obd_device       *obd = mdd2obd_dev(mdd);
239         struct obd_export       *lov_exp = obd->u.mds.mds_osc_exp;
240         struct lov_stripe_md    *lsm = NULL;
241         int rc;
242         ENTRY;
243
244         LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
245         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0,
246                            &lsm, buf->lb_buf);
247         if (rc)
248                 RETURN(rc);
249         obd_free_memmd(lov_exp, &lsm);
250
251         rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle);
252
253         CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
254         RETURN(rc);
255 }
256
257 /*
258  * Permission check is done before call it,
259  * no need check again.
260  */
261 static int mdd_lov_set_dir_md(const struct lu_env *env,
262                               struct mdd_object *obj, struct lu_buf *buf,
263                               struct thandle *handle)
264 {
265         struct lov_user_md *lum = NULL;
266         int rc = 0;
267         ENTRY;
268
269         LASSERT(S_ISDIR(mdd_object_type(obj)));
270         lum = (struct lov_user_md*)buf->lb_buf;
271
272         /* if { size, offset, count } = { 0, -1, 0 } and no pool (i.e. all default
273          * values specified) then delete default striping from dir. */
274         if (lum->lmm_stripe_size == 0 && lum->lmm_stripe_count == 0 &&
275             lum->lmm_stripe_offset == (typeof(lum->lmm_stripe_offset))(-1) &&
276             lum->lmm_magic != LOV_USER_MAGIC_V3) {
277                 rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL,
278                                        XATTR_NAME_LOV, 0, handle);
279                 if (rc == -ENODATA)
280                         rc = 0;
281                 CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
282                                 PFID(mdo2fid(obj)), rc);
283         } else {
284                 rc = mdd_lov_set_stripe_md(env, obj, buf, handle);
285         }
286         RETURN(rc);
287 }
288
289 int mdd_lsm_sanity_check(const struct lu_env *env,  struct mdd_object *obj)
290 {
291         struct lu_attr   *tmp_la = &mdd_env_info(env)->mti_la;
292         struct md_ucred  *uc     = md_ucred(env);
293         int rc;
294         ENTRY;
295
296         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
297         if (rc)
298                 RETURN(rc);
299
300         if ((uc->mu_fsuid != tmp_la->la_uid) &&
301             !mdd_capable(uc, CFS_CAP_FOWNER))
302                 rc = mdd_permission_internal_locked(env, obj, tmp_la,
303                                                     MAY_WRITE, MOR_TGT_CHILD);
304
305         RETURN(rc);
306 }
307
308 int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
309                    struct mdd_object *child, struct lov_mds_md *lmmp,
310                    int lmm_size, struct thandle *handle, int set_stripe)
311 {
312         struct lu_buf *buf;
313         umode_t mode;
314         int rc = 0;
315         ENTRY;
316
317         buf = mdd_buf_get(env, lmmp, lmm_size);
318         mode = mdd_object_type(child);
319         if (S_ISREG(mode) && lmm_size > 0) {
320                 if (set_stripe) {
321                         rc = mdd_lov_set_stripe_md(env, child, buf, handle);
322                 } else {
323                         rc = mdd_xattr_set_txn(env, child, buf,
324                                                XATTR_NAME_LOV, 0, handle);
325                 }
326         } else if (S_ISDIR(mode)) {
327                 if (lmmp == NULL && lmm_size == 0) {
328                         struct mdd_device *mdd = mdd_obj2mdd_dev(child);
329                         struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd);
330                         int size = sizeof(struct lov_mds_md_v3);
331
332                         /* Get parent dir stripe and set */
333                         if (pobj != NULL)
334                                 rc = mdd_get_md_locked(env, pobj, lmm, &size,
335                                                        XATTR_NAME_LOV);
336                         if (rc > 0) {
337                                 buf = mdd_buf_get(env, lmm, size);
338                                 rc = mdd_xattr_set_txn(env, child, buf,
339                                                XATTR_NAME_LOV, 0, handle);
340                                 if (rc)
341                                         CERROR("error on copy stripe info: rc "
342                                                 "= %d\n", rc);
343                         }
344                 } else {
345                         LASSERT(lmmp != NULL && lmm_size > 0);
346                         rc = mdd_lov_set_dir_md(env, child, buf, handle);
347                 }
348         }
349         CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n",
350                         lmmp, lmm_size, PFID(mdo2fid(child)), rc);
351         RETURN(rc);
352 }
353
354 /*
355  * XXX: this is for create lsm object id, which should identify the lsm object
356  * unique in the whole mds, as I see. But it seems, we still not need it
357  * now. Right? So just borrow the ll_fid_build_ino().
358  */
359 static obd_id mdd_lov_create_id(const struct lu_fid *fid)
360 {
361         return fid_flatten(fid);
362 }
363
364 int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
365 {
366         /* copy mds_lov code is using wrong layer */
367         return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
368 }
369
370 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
371 {
372         /* copy mds_lov code is using wrong layer */
373         mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
374 }
375
376 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
377                            struct lov_mds_md *lmm, int lmm_size,
378                            const struct md_op_spec *spec)
379 {
380         if (lmm && !spec->no_create)
381                 OBD_FREE(lmm, lmm_size);
382 }
383
384 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
385                    struct mdd_object *parent, struct mdd_object *child,
386                    struct lov_mds_md **lmm, int *lmm_size,
387                    const struct md_op_spec *spec, struct lu_attr *la)
388 {
389         struct obd_device     *obd = mdd2obd_dev(mdd);
390         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
391         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
392         struct obdo           *oa;
393         struct lov_stripe_md  *lsm = NULL;
394         const void            *eadata = spec->u.sp_ea.eadata;
395         __u32                  create_flags = spec->sp_cr_flags;
396         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
397         int                    rc = 0;
398         ENTRY;
399
400         if (!md_should_create(create_flags))
401                 RETURN(0);
402
403         oti_init(oti, NULL);
404
405         /* replay case, has objects already, only get lov from eadata */
406         if (spec->no_create != 0) {
407                 *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
408                 *lmm_size = spec->u.sp_ea.eadatalen;
409                 RETURN(0);
410         }
411
412         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO))
413                 GOTO(out_ids, rc = -ENOMEM);
414
415         LASSERT(lov_exp != NULL);
416         oa = &mdd_env_info(env)->mti_oa;
417
418         oa->o_uid = 0; /* must have 0 uid / gid on OST */
419         oa->o_gid = 0;
420         oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
421         oa->o_mode = S_IFREG | 0600;
422         oa->o_id = mdd_lov_create_id(mdd_object_fid(child));
423         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
424                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
425         oa->o_size = 0;
426
427         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
428                 if (create_flags & MDS_OPEN_HAS_EA) {
429                         LASSERT(eadata != NULL);
430                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
431                                            0, &lsm, (void*)eadata);
432                         if (rc)
433                                 GOTO(out_oti, rc);
434                         lsm->lsm_object_id = oa->o_id;
435                         lsm->lsm_object_gr = oa->o_gr;
436                 } else if (parent != NULL) {
437                         /* get lov ea from parent and set to lov */
438                         struct lov_mds_md *_lmm;
439                         int _lmm_size;
440
441                         _lmm_size = mdd_lov_mdsize(env, mdd);
442                         _lmm = mdd_max_lmm_get(env, mdd);
443
444                         if (_lmm == NULL)
445                                 GOTO(out_oti, rc = -ENOMEM);
446
447                         rc = mdd_get_md_locked(env, parent, _lmm,
448                                                &_lmm_size,
449                                                XATTR_NAME_LOV);
450                         if (rc > 0)
451                                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
452                                                    lov_exp, 0, &lsm, _lmm);
453                         if (rc)
454                                 GOTO(out_oti, rc);
455                 }
456
457                 rc = obd_create(lov_exp, oa, &lsm, oti);
458                 if (rc) {
459                         if (rc > 0) {
460                                 CERROR("Create error for "DFID": %d\n",
461                                        PFID(mdo2fid(child)), rc);
462                                 rc = -EIO;
463                         }
464                         GOTO(out_oti, rc);
465                 }
466                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
467         } else {
468                 LASSERT(eadata != NULL);
469                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
470                                    (void*)eadata);
471                 if (rc)
472                         GOTO(out_oti, rc);
473                 lsm->lsm_object_id = oa->o_id;
474                 lsm->lsm_object_gr = oa->o_gr;
475         }
476
477         /*
478          * Sometimes, we may truncate some object(without lsm) then open it
479          * (with write flags), so creating lsm above.  The Nonzero(truncated)
480          * size should tell ost, since size attr is in charge by OST.
481          */
482         if (la->la_size && la->la_valid & LA_SIZE) {
483                 struct obd_info *oinfo = &mdd_env_info(env)->mti_oi;
484
485                 memset(oinfo, 0, sizeof(*oinfo));
486
487                 /* When setting attr to ost, FLBKSZ is not needed. */
488                 oa->o_valid &= ~OBD_MD_FLBLKSZ;
489                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
490                              OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
491
492                 /*
493                  * XXX: Pack lustre id to OST, in OST, it will be packed by
494                  * filter_fid, but can not see what is the usages. So just pack
495                  * o_seq o_ver here, maybe fix it after this cycle.
496                  */
497                 oa->o_fid = fid_seq(mdd_object_fid(child));
498                 oa->o_generation = fid_oid(mdd_object_fid(child));
499                 oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
500                 oinfo->oi_oa = oa;
501                 oinfo->oi_md = lsm;
502                 oinfo->oi_capa = NULL;
503                 oinfo->oi_policy.l_extent.start = la->la_size;
504                 oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF;
505
506                 rc = obd_punch_rqset(lov_exp, oinfo, oti);
507                 if (rc) {
508                         CERROR("Error setting attrs for "DFID": rc %d\n",
509                                PFID(mdo2fid(child)), rc);
510                         if (rc > 0) {
511                                 CERROR("obd_setattr for "DFID" rc %d\n",
512                                         PFID(mdo2fid(child)), rc);
513                                 rc = -EIO;
514                         }
515                         GOTO(out_oti, rc);
516                 }
517         }
518
519         /* blksize should be changed after create data object */
520         la->la_valid |= LA_BLKSIZE;
521         la->la_blksize = oa->o_blksize;
522         *lmm = NULL;
523         rc = obd_packmd(lov_exp, lmm, lsm);
524         if (rc < 0) {
525                 CERROR("Cannot pack lsm, err = %d\n", rc);
526                 GOTO(out_oti, rc);
527         }
528         if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
529                 CERROR("Not have memory for update objid\n");
530                 OBD_FREE(*lmm, rc);
531                 *lmm = NULL;
532                 GOTO(out_oti, rc = -ENOMEM);
533         }
534         *lmm_size = rc;
535         rc = 0;
536         EXIT;
537 out_oti:
538         oti_free_cookies(oti);
539 out_ids:
540         if (lsm)
541                 obd_free_memmd(lov_exp, &lsm);
542
543         return rc;
544 }
545
546 /*
547  * used when destroying orphans and from mds_reint_unlink() when MDS wants to
548  * destroy objects on OSS.
549  */
550 static
551 int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
552                       struct mdd_object *obj, struct lu_attr *la,
553                       struct lov_mds_md *lmm, int lmm_size,
554                       struct llog_cookie *logcookies,
555                       int log_unlink)
556 {
557         struct obd_device     *obd = mdd2obd_dev(mdd);
558         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
559         struct lov_stripe_md  *lsm = NULL;
560         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
561         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
562         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
563         int rc;
564         ENTRY;
565
566         if (lmm_size == 0)
567                 RETURN(0);
568
569         rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
570         if (rc < 0) {
571                 CERROR("Error unpack md %p\n", lmm);
572                 RETURN(rc);
573         } else {
574                 LASSERT(rc >= sizeof(*lsm));
575                 rc = 0;
576         }
577
578         oa->o_id = lsm->lsm_object_id;
579         oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
580         oa->o_mode = la->la_mode & S_IFMT;
581         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
582
583         oti_init(oti, NULL);
584         if (log_unlink && logcookies) {
585                 oa->o_valid |= OBD_MD_FLCOOKIE;
586                 oti->oti_logcookies = logcookies;
587         }
588
589         CDEBUG(D_INFO, "destroying OSS object %d/%d\n",
590                         (int)oa->o_id, (int)oa->o_gr);
591
592         rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, NULL);
593
594         obd_free_memmd(lov_exp, &lsm);
595         RETURN(rc);
596 }
597
598 /*
599  * called with obj not locked. 
600  */
601
602 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
603                     struct mdd_object *obj, struct lu_attr *la)
604 {
605         struct md_attr    *ma = &mdd_env_info(env)->mti_ma;
606         int                rc;
607         ENTRY;
608
609         if (unlikely(la->la_nlink != 0)) {
610                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
611                       la->la_nlink);
612                 RETURN(0);
613         }
614
615         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
616         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
617         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
618         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
619         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
620                 RETURN(rc = -ENOMEM);
621
622         /* get lov ea */
623
624         rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
625                                XATTR_NAME_LOV);
626
627         if (rc <= 0) {
628                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
629                          PFID(mdo2fid(obj)), rc);
630                 if (rc == 0)
631                         rc = -ENOENT;
632                 RETURN(rc);
633         }
634
635         ma->ma_valid = MA_LOV;
636         
637         rc = mdd_unlink_log(env, mdd, obj, ma);
638         if (rc) {
639                 CWARN("mds unlink log for "DFID" failed: %d\n",
640                        PFID(mdo2fid(obj)), rc);
641                 RETURN(rc);
642         }
643
644         if (ma->ma_valid & MA_COOKIE)
645                 rc = mdd_lovobj_unlink(env, mdd, obj, la,
646                                        ma->ma_lmm, ma->ma_lmm_size,
647                                        ma->ma_cookie, 1);
648         RETURN(rc);
649 }
650
651 int mdd_log_op_unlink(struct obd_device *obd,
652                       struct lov_mds_md *lmm, int lmm_size,
653                       struct llog_cookie *logcookies, int cookies_size)
654 {
655         struct mds_obd *mds = &obd->u.mds;
656         struct lov_stripe_md *lsm = NULL;
657         struct llog_unlink_rec *lur;
658         struct llog_ctxt *ctxt;
659         int rc;
660         ENTRY;
661
662         if (IS_ERR(mds->mds_osc_obd))
663                 RETURN(PTR_ERR(mds->mds_osc_obd));
664
665         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
666         if (rc < 0)
667                 RETURN(rc);
668         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
669         if (rc)
670                 GOTO(out, rc);
671         /* first prepare unlink log record */
672         OBD_ALLOC(lur, sizeof(*lur));
673         if (!lur)
674                 GOTO(out, rc = -ENOMEM);
675         lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur);
676         lur->lur_hdr.lrh_type = MDS_UNLINK_REC;
677
678         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
679         rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
680                       cookies_size / sizeof(struct llog_cookie));
681         llog_ctxt_put(ctxt);
682
683         OBD_FREE(lur, sizeof(*lur));
684         GOTO(out, rc);
685 out:
686         obd_free_memmd(mds->mds_osc_exp, &lsm);
687         return rc;
688 }
689
690 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
691                    struct mdd_object *mdd_cobj, struct md_attr *ma)
692 {
693         struct obd_device *obd = mdd2obd_dev(mdd);
694
695         LASSERT(ma->ma_valid & MA_LOV);
696
697         if ((ma->ma_cookie_size > 0) &&
698             (mdd_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size,
699                                ma->ma_cookie, ma->ma_cookie_size) > 0)) {
700                 ma->ma_valid |= MA_COOKIE;
701         }
702         return 0;
703 }
704
705 int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
706                       struct lov_mds_md *lmm, int lmm_size,
707                       struct llog_cookie *logcookies, int cookies_size)
708 {
709         struct mds_obd *mds = &obd->u.mds;
710         struct lov_stripe_md *lsm = NULL;
711         struct llog_setattr64_rec *lsr;
712         struct llog_ctxt *ctxt;
713         int rc;
714         ENTRY;
715
716         if (IS_ERR(mds->mds_osc_obd))
717                 RETURN(PTR_ERR(mds->mds_osc_obd));
718
719         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
720         if (rc < 0)
721                 RETURN(rc);
722
723         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
724         if (rc)
725                 GOTO(out, rc);
726
727         OBD_ALLOC(lsr, sizeof(*lsr));
728         if (!lsr)
729                 GOTO(out, rc = -ENOMEM);
730
731         /* prepare setattr log record */
732         lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
733         lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC;
734         lsr->lsr_uid = uid;
735         lsr->lsr_gid = gid;
736
737         /* write setattr log */
738         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
739         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
740                       cookies_size / sizeof(struct llog_cookie));
741
742         llog_ctxt_put(ctxt);
743
744         OBD_FREE(lsr, sizeof(*lsr));
745  out:
746         obd_free_memmd(mds->mds_osc_exp, &lsm);
747         RETURN(rc);
748 }
749
750 int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd,
751                     const struct md_attr *ma,
752                     struct lov_mds_md *lmm, int lmm_size,
753                     struct llog_cookie *logcookies, int cookies_size)
754 {
755         struct obd_device *obd = mdd2obd_dev(mdd);
756
757         /* journal chown/chgrp in llog, just like unlink */
758         if (lmm_size > 0) {
759                 CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n",
760                         (unsigned long)ma->ma_attr.la_uid,
761                         (unsigned long)ma->ma_attr.la_gid);
762                 return mdd_log_op_setattr(obd, ma->ma_attr.la_uid,
763                                           ma->ma_attr.la_gid, lmm,
764                                           lmm_size, logcookies,
765                                           cookies_size);
766         } else
767                 return 0;
768 }
769
770 static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
771                           struct lov_mds_md *lmm, int lmm_size,
772                           struct llog_cookie *logcookies, __u64 id, __u32 gen,
773                           struct obd_capa *oc)
774 {
775         struct mds_obd *mds = &obd->u.mds;
776         struct obd_trans_info oti = { 0 };
777         struct obd_info oinfo = { { { 0 } } };
778         int rc;
779         ENTRY;
780
781         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR))
782                 RETURN(0);
783
784         /* first get memory EA */
785         OBDO_ALLOC(oinfo.oi_oa);
786         if (!oinfo.oi_oa)
787                 RETURN(-ENOMEM);
788
789         LASSERT(lmm);
790
791         rc = obd_unpackmd(mds->mds_osc_exp, &oinfo.oi_md, lmm, lmm_size);
792         if (rc < 0) {
793                 CERROR("Error unpack md %p for inode "LPU64"\n", lmm, id);
794                 GOTO(out, rc);
795         }
796
797         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, oinfo.oi_md);
798         if (rc) {
799                 CERROR("Error revalidate lsm %p \n", oinfo.oi_md);
800                 GOTO(out, rc);
801         }
802
803         /* then fill oa */
804         oinfo.oi_oa->o_uid = uid;
805         oinfo.oi_oa->o_gid = gid;
806         oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id;
807         oinfo.oi_oa->o_gr = oinfo.oi_md->lsm_object_gr;
808         oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
809                                 OBD_MD_FLUID | OBD_MD_FLGID;
810         if (logcookies) {
811                 oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE;
812                 oti.oti_logcookies = logcookies;
813         }
814
815         oinfo.oi_oa->o_fid = id;
816         oinfo.oi_oa->o_generation = gen;
817         oinfo.oi_oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
818         oinfo.oi_capa = oc;
819
820         /* do async setattr from mds to ost not waiting for responses. */
821         rc = obd_setattr_async(mds->mds_osc_exp, &oinfo, &oti, NULL);
822         if (rc)
823                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
824                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
825 out:
826         if (oinfo.oi_md)
827                 obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md);
828         OBDO_FREE(oinfo.oi_oa);
829         RETURN(rc);
830 }
831
832 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
833                           struct lov_mds_md *lmm, int lmm_size,
834                           struct llog_cookie *logcookies)
835 {
836         struct mdd_device   *mdd = mdo2mdd(&obj->mod_obj);
837         struct obd_device   *obd = mdd2obd_dev(mdd);
838         struct lu_attr      *tmp_la = &mdd_env_info(env)->mti_la;
839         const struct lu_fid *fid = mdd_object_fid(obj);
840         int rc = 0;
841         ENTRY;
842
843         mdd_read_lock(env, obj, MOR_TGT_CHILD);
844         rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj));
845         mdd_read_unlock(env, obj);
846         if (rc)
847                 RETURN(rc);
848
849         rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
850                                    lmm_size, logcookies, fid_seq(fid),
851                                    fid_oid(fid), NULL);
852         RETURN(rc);
853 }