Whamcloud - gitweb
6044429abc6e5c0a431dc30163dc4d1619055e6c
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: wangdi <wangdi@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <obd_lov.h>
55 #include <lprocfs_status.h>
56 #include <lustre_mds.h>
57 #include <lustre_fid.h>
58 #include <lustre/lustre_idl.h>
59
60 #include "mdd_internal.h"
61
62 static int mdd_notify(struct obd_device *host, struct obd_device *watched,
63                       enum obd_notify_event ev, void *owner)
64 {
65         struct mdd_device *mdd = owner;
66         int rc = 0;
67         ENTRY;
68
69         LASSERT(owner != NULL);
70         switch (ev)
71         {
72                 case OBD_NOTIFY_ACTIVE:
73                 case OBD_NOTIFY_SYNC:
74                 case OBD_NOTIFY_SYNC_NONBLOCK:
75                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_SYNC);
76                         break;
77                 case OBD_NOTIFY_CONFIG:
78                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_CONFIG);
79                         break;
80 #ifdef HAVE_QUOTA_SUPPORT
81                 case OBD_NOTIFY_QUOTA:
82                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_QUOTA);
83                         break;
84 #endif
85                 default:
86                         CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
87         }
88
89         RETURN(rc);
90 }
91
92 /* The obd is created for handling data stack for mdd */
93 int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
94                  struct lustre_cfg *cfg)
95 {
96         char                   *dev = lustre_cfg_string(cfg, 0);
97         int                     rc, name_size, uuid_size;
98         char                   *name, *uuid;
99         __u32                   mds_id;
100         struct lustre_cfg_bufs *bufs;
101         struct lustre_cfg      *lcfg;
102         struct obd_device      *obd;
103         ENTRY;
104
105         mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
106         name_size = strlen(MDD_OBD_NAME) + 35;
107         uuid_size = strlen(MDD_OBD_UUID) + 35;
108
109         OBD_ALLOC(name, name_size);
110         OBD_ALLOC(uuid, uuid_size);
111         if (name == NULL || uuid == NULL)
112                 GOTO(cleanup_mem, rc = -ENOMEM);
113
114         OBD_ALLOC_PTR(bufs);
115         if (!bufs)
116                 GOTO(cleanup_mem, rc = -ENOMEM);
117
118         snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s-%d",
119                  MDD_OBD_NAME, dev, mds_id);
120
121         snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s-%d",
122                  MDD_OBD_UUID, dev, mds_id);
123
124         lustre_cfg_bufs_reset(bufs, name);
125         lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE);
126         lustre_cfg_bufs_set_string(bufs, 2, uuid);
127         lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */);
128         lustre_cfg_bufs_set_string(bufs, 4, (char*)dev);
129
130         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
131         OBD_FREE_PTR(bufs);
132         if (!lcfg)
133                 GOTO(cleanup_mem, rc = -ENOMEM);
134
135         rc = class_attach(lcfg);
136         if (rc)
137                 GOTO(lcfg_cleanup, rc);
138
139         obd = class_name2obd(name);
140         if (!obd) {
141                 CERROR("Can not find obd %s\n", MDD_OBD_NAME);
142                 LBUG();
143         }
144
145         obd->obd_recovering = 1;
146         obd->u.mds.mds_id = mds_id;
147         rc = class_setup(obd, lcfg);
148         if (rc)
149                 GOTO(class_detach, rc);
150
151         /*
152          * Add here for obd notify mechanism, when adding a new ost, the mds
153          * will notify this mdd. The mds will be used for quota also.
154          */
155         obd->obd_upcall.onu_upcall = mdd_notify;
156         obd->obd_upcall.onu_owner = mdd;
157         mdd->mdd_obd_dev = obd;
158         EXIT;
159 class_detach:
160         if (rc)
161                 class_detach(obd, lcfg);
162 lcfg_cleanup:
163         lustre_cfg_free(lcfg);
164 cleanup_mem:
165         if (name)
166                 OBD_FREE(name, name_size);
167         if (uuid)
168                 OBD_FREE(uuid, uuid_size);
169         return rc;
170 }
171
172 int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd,
173                  struct lustre_cfg *lcfg)
174 {
175         struct obd_device      *obd;
176         int rc;
177         ENTRY;
178
179         obd = mdd2obd_dev(mdd);
180         LASSERT(obd);
181
182         rc = class_cleanup(obd, lcfg);
183         if (rc)
184                 GOTO(lcfg_cleanup, rc);
185
186         obd->obd_upcall.onu_upcall = NULL;
187         obd->obd_upcall.onu_owner = NULL;
188         rc = class_detach(obd, lcfg);
189         if (rc)
190                 GOTO(lcfg_cleanup, rc);
191         mdd->mdd_obd_dev = NULL;
192
193         EXIT;
194 lcfg_cleanup:
195         return rc;
196 }
197
198 int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
199                void *md, int *md_size, const char *name)
200 {
201         int rc;
202         ENTRY;
203
204         rc = mdo_xattr_get(env, obj, mdd_buf_get(env, md, *md_size), name,
205                            mdd_object_capa(env, obj));
206         /*
207          * XXX: Handling of -ENODATA, the right way is to have ->do_md_get()
208          * exported by dt layer.
209          */
210         if (rc == 0 || rc == -ENODATA) {
211                 *md_size = 0;
212                 rc = 0;
213         } else if (rc < 0) {
214                 CERROR("Error %d reading eadata - %d\n", rc, *md_size);
215         } else {
216                 /* XXX: Convert lov EA but fixed after verification test. */
217                 *md_size = rc;
218         }
219
220         RETURN(rc);
221 }
222
223 int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
224                       void *md, int *md_size, const char *name)
225 {
226         int rc = 0;
227         mdd_read_lock(env, obj, MOR_TGT_CHILD);
228         rc = mdd_get_md(env, obj, md, md_size, name);
229         mdd_read_unlock(env, obj);
230         return rc;
231 }
232
233 static int mdd_lov_set_stripe_md(const struct lu_env *env,
234                                  struct mdd_object *obj, struct lu_buf *buf,
235                                  struct thandle *handle)
236 {
237         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
238         struct obd_device       *obd = mdd2obd_dev(mdd);
239         struct obd_export       *lov_exp = obd->u.mds.mds_osc_exp;
240         struct lov_stripe_md    *lsm = NULL;
241         int rc;
242         ENTRY;
243
244         LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
245         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0,
246                            &lsm, buf->lb_buf);
247         if (rc)
248                 RETURN(rc);
249         obd_free_memmd(lov_exp, &lsm);
250
251         rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle);
252
253         CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
254         RETURN(rc);
255 }
256
257 /*
258  * Permission check is done before call it,
259  * no need check again.
260  */
261 static int mdd_lov_set_dir_md(const struct lu_env *env,
262                               struct mdd_object *obj, struct lu_buf *buf,
263                               struct thandle *handle)
264 {
265         struct lov_user_md *lum = NULL;
266         int rc = 0;
267         ENTRY;
268
269         LASSERT(S_ISDIR(mdd_object_type(obj)));
270         lum = (struct lov_user_md*)buf->lb_buf;
271
272         /* if { size, offset, count } = { 0, -1, 0 } and no pool (i.e. all default
273          * values specified) then delete default striping from dir. */
274         if (lum->lmm_stripe_size == 0 && lum->lmm_stripe_count == 0 &&
275             lum->lmm_stripe_offset == (typeof(lum->lmm_stripe_offset))(-1) &&
276             lum->lmm_magic != LOV_USER_MAGIC_V3) {
277                 rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL,
278                                        XATTR_NAME_LOV, 0, handle);
279                 if (rc == -ENODATA)
280                         rc = 0;
281                 CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
282                                 PFID(mdo2fid(obj)), rc);
283         } else {
284                 rc = mdd_lov_set_stripe_md(env, obj, buf, handle);
285         }
286         RETURN(rc);
287 }
288
289 int mdd_lsm_sanity_check(const struct lu_env *env,  struct mdd_object *obj)
290 {
291         struct lu_attr   *tmp_la = &mdd_env_info(env)->mti_la;
292         struct md_ucred  *uc     = md_ucred(env);
293         int rc;
294         ENTRY;
295
296         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
297         if (rc)
298                 RETURN(rc);
299
300         if ((uc->mu_fsuid != tmp_la->la_uid) &&
301             !mdd_capable(uc, CFS_CAP_FOWNER))
302                 rc = mdd_permission_internal_locked(env, obj, tmp_la,
303                                                     MAY_WRITE, MOR_TGT_CHILD);
304
305         RETURN(rc);
306 }
307
308 int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
309                    struct mdd_object *child, struct lov_mds_md *lmmp,
310                    int lmm_size, struct thandle *handle, int set_stripe)
311 {
312         struct lu_buf *buf;
313         umode_t mode;
314         int rc = 0;
315         ENTRY;
316
317         buf = mdd_buf_get(env, lmmp, lmm_size);
318         mode = mdd_object_type(child);
319         if (S_ISREG(mode) && lmm_size > 0) {
320                 if (set_stripe) {
321                         rc = mdd_lov_set_stripe_md(env, child, buf, handle);
322                 } else {
323                         rc = mdd_xattr_set_txn(env, child, buf,
324                                                XATTR_NAME_LOV, 0, handle);
325                 }
326         } else if (S_ISDIR(mode)) {
327                 if (lmmp == NULL && lmm_size == 0) {
328                         struct mdd_device *mdd = mdd_obj2mdd_dev(child);
329                         struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd);
330                         int size = sizeof(struct lov_mds_md_v3);
331
332                         /* Get parent dir stripe and set */
333                         if (pobj != NULL)
334                                 rc = mdd_get_md_locked(env, pobj, lmm, &size,
335                                                        XATTR_NAME_LOV);
336                         if (rc > 0) {
337                                 buf = mdd_buf_get(env, lmm, size);
338                                 rc = mdd_xattr_set_txn(env, child, buf,
339                                                XATTR_NAME_LOV, 0, handle);
340                                 if (rc)
341                                         CERROR("error on copy stripe info: rc "
342                                                 "= %d\n", rc);
343                         }
344                 } else {
345                         LASSERT(lmmp != NULL && lmm_size > 0);
346                         rc = mdd_lov_set_dir_md(env, child, buf, handle);
347                 }
348         }
349         CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n",
350                         lmmp, lmm_size, PFID(mdo2fid(child)), rc);
351         RETURN(rc);
352 }
353
354 /*
355  * XXX: this is for create lsm object id, which should identify the lsm object
356  * unique in the whole mds, as I see. But it seems, we still not need it
357  * now. Right? So just borrow the ll_fid_build_ino().
358  */
359 static obd_id mdd_lov_create_id(const struct lu_fid *fid)
360 {
361         return fid_flatten(fid);
362 }
363
364 int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
365 {
366         /* copy mds_lov code is using wrong layer */
367         return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
368 }
369
370 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
371 {
372         /* copy mds_lov code is using wrong layer */
373         mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
374 }
375
376 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
377                            struct lov_mds_md *lmm, int lmm_size,
378                            const struct md_op_spec *spec)
379 {
380         if (lmm && !spec->no_create)
381                 OBD_FREE(lmm, lmm_size);
382 }
383
384 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
385                    struct mdd_object *parent, struct mdd_object *child,
386                    struct lov_mds_md **lmm, int *lmm_size,
387                    const struct md_op_spec *spec, struct lu_attr *la)
388 {
389         struct obd_device     *obd = mdd2obd_dev(mdd);
390         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
391         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
392         struct obdo           *oa;
393         struct lov_stripe_md  *lsm = NULL;
394         const void            *eadata = spec->u.sp_ea.eadata;
395         __u32                  create_flags = spec->sp_cr_flags;
396         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
397         int                    rc = 0;
398         ENTRY;
399
400         if (!md_should_create(create_flags)) {
401                 *lmm_size = 0;
402                 RETURN(0);
403         }
404         oti_init(oti, NULL);
405
406         /* replay case, has objects already, only get lov from eadata */
407         if (spec->no_create != 0) {
408                 *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
409                 *lmm_size = spec->u.sp_ea.eadatalen;
410                 RETURN(0);
411         }
412
413         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO))
414                 GOTO(out_ids, rc = -ENOMEM);
415
416         LASSERT(lov_exp != NULL);
417         oa = &mdd_env_info(env)->mti_oa;
418
419         oa->o_uid = 0; /* must have 0 uid / gid on OST */
420         oa->o_gid = 0;
421         oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
422         oa->o_mode = S_IFREG | 0600;
423         oa->o_id = mdd_lov_create_id(mdd_object_fid(child));
424         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
425                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
426         oa->o_size = 0;
427
428         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
429                 if (create_flags & MDS_OPEN_HAS_EA) {
430                         LASSERT(eadata != NULL);
431                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
432                                            0, &lsm, (void*)eadata);
433                         if (rc)
434                                 GOTO(out_oti, rc);
435                         lsm->lsm_object_id = oa->o_id;
436                         lsm->lsm_object_gr = oa->o_gr;
437                 } else if (parent != NULL) {
438                         /* get lov ea from parent and set to lov */
439                         struct lov_mds_md *_lmm;
440                         int _lmm_size;
441
442                         _lmm_size = mdd_lov_mdsize(env, mdd);
443                         _lmm = mdd_max_lmm_get(env, mdd);
444
445                         if (_lmm == NULL)
446                                 GOTO(out_oti, rc = -ENOMEM);
447
448                         rc = mdd_get_md_locked(env, parent, _lmm,
449                                                &_lmm_size,
450                                                XATTR_NAME_LOV);
451                         if (rc > 0)
452                                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
453                                                    lov_exp, *lmm_size,
454                                                    &lsm, _lmm);
455
456                         if (rc)
457                                 GOTO(out_oti, rc);
458                 }
459
460                 rc = obd_create(lov_exp, oa, &lsm, oti);
461                 if (rc) {
462                         if (rc > 0) {
463                                 CERROR("Create error for "DFID": %d\n",
464                                        PFID(mdo2fid(child)), rc);
465                                 rc = -EIO;
466                         }
467                         GOTO(out_oti, rc);
468                 }
469                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
470         } else {
471                 LASSERT(eadata != NULL);
472                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
473                                    (void*)eadata);
474                 if (rc)
475                         GOTO(out_oti, rc);
476                 lsm->lsm_object_id = oa->o_id;
477                 lsm->lsm_object_gr = oa->o_gr;
478         }
479
480         /*
481          * Sometimes, we may truncate some object(without lsm) then open it
482          * (with write flags), so creating lsm above.  The Nonzero(truncated)
483          * size should tell ost, since size attr is in charge by OST.
484          */
485         if (la->la_size && la->la_valid & LA_SIZE) {
486                 struct obd_info *oinfo = &mdd_env_info(env)->mti_oi;
487
488                 memset(oinfo, 0, sizeof(*oinfo));
489
490                 /* When setting attr to ost, FLBKSZ is not needed. */
491                 oa->o_valid &= ~OBD_MD_FLBLKSZ;
492                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
493                              OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
494
495                 /*
496                  * XXX: Pack lustre id to OST, in OST, it will be packed by
497                  * filter_fid, but can not see what is the usages. So just pack
498                  * o_seq o_ver here, maybe fix it after this cycle.
499                  */
500                 oa->o_fid = fid_seq(mdd_object_fid(child));
501                 oa->o_generation = fid_oid(mdd_object_fid(child));
502                 oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
503                 oinfo->oi_oa = oa;
504                 oinfo->oi_md = lsm;
505                 oinfo->oi_capa = NULL;
506                 oinfo->oi_policy.l_extent.start = la->la_size;
507                 oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF;
508
509                 rc = obd_punch_rqset(lov_exp, oinfo, oti);
510                 if (rc) {
511                         CERROR("Error setting attrs for "DFID": rc %d\n",
512                                PFID(mdo2fid(child)), rc);
513                         if (rc > 0) {
514                                 CERROR("obd_setattr for "DFID" rc %d\n",
515                                         PFID(mdo2fid(child)), rc);
516                                 rc = -EIO;
517                         }
518                         GOTO(out_oti, rc);
519                 }
520         }
521
522         /* blksize should be changed after create data object */
523         la->la_valid |= LA_BLKSIZE;
524         la->la_blksize = oa->o_blksize;
525         *lmm = NULL;
526         rc = obd_packmd(lov_exp, lmm, lsm);
527         if (rc < 0) {
528                 CERROR("Cannot pack lsm, err = %d\n", rc);
529                 GOTO(out_oti, rc);
530         }
531         if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
532                 CERROR("Not have memory for update objid\n");
533                 OBD_FREE(*lmm, rc);
534                 *lmm = NULL;
535                 GOTO(out_oti, rc = -ENOMEM);
536         }
537         *lmm_size = rc;
538         rc = 0;
539         EXIT;
540 out_oti:
541         oti_free_cookies(oti);
542 out_ids:
543         if (lsm)
544                 obd_free_memmd(lov_exp, &lsm);
545
546         return rc;
547 }
548
549 /*
550  * used when destroying orphans and from mds_reint_unlink() when MDS wants to
551  * destroy objects on OSS.
552  */
553 static
554 int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
555                       struct mdd_object *obj, struct lu_attr *la,
556                       struct lov_mds_md *lmm, int lmm_size,
557                       struct llog_cookie *logcookies,
558                       int log_unlink)
559 {
560         struct obd_device     *obd = mdd2obd_dev(mdd);
561         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
562         struct lov_stripe_md  *lsm = NULL;
563         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
564         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
565         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
566         int rc;
567         ENTRY;
568
569         if (lmm_size == 0)
570                 RETURN(0);
571
572         rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
573         if (rc < 0) {
574                 CERROR("Error unpack md %p\n", lmm);
575                 RETURN(rc);
576         } else {
577                 LASSERT(rc >= sizeof(*lsm));
578                 rc = 0;
579         }
580
581         oa->o_id = lsm->lsm_object_id;
582         oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
583         oa->o_mode = la->la_mode & S_IFMT;
584         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
585
586         oti_init(oti, NULL);
587         if (log_unlink && logcookies) {
588                 oa->o_valid |= OBD_MD_FLCOOKIE;
589                 oti->oti_logcookies = logcookies;
590         }
591
592         CDEBUG(D_INFO, "destroying OSS object %d/%d\n",
593                         (int)oa->o_id, (int)oa->o_gr);
594
595         rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, NULL);
596
597         obd_free_memmd(lov_exp, &lsm);
598         RETURN(rc);
599 }
600
601 /*
602  * called with obj not locked. 
603  */
604
605 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
606                     struct mdd_object *obj, struct lu_attr *la)
607 {
608         struct md_attr    *ma = &mdd_env_info(env)->mti_ma;
609         int                rc;
610         ENTRY;
611
612         if (unlikely(la->la_nlink != 0)) {
613                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
614                       la->la_nlink);
615                 RETURN(0);
616         }
617
618         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
619         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
620         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
621         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
622         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
623                 RETURN(rc = -ENOMEM);
624
625         /* get lov ea */
626
627         rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
628                                XATTR_NAME_LOV);
629
630         if (rc <= 0) {
631                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
632                          PFID(mdo2fid(obj)), rc);
633                 if (rc == 0)
634                         rc = -ENOENT;
635                 RETURN(rc);
636         }
637
638         ma->ma_valid = MA_LOV;
639         
640         rc = mdd_unlink_log(env, mdd, obj, ma);
641         if (rc) {
642                 CWARN("mds unlink log for "DFID" failed: %d\n",
643                        PFID(mdo2fid(obj)), rc);
644                 RETURN(rc);
645         }
646
647         if (ma->ma_valid & MA_COOKIE)
648                 rc = mdd_lovobj_unlink(env, mdd, obj, la,
649                                        ma->ma_lmm, ma->ma_lmm_size,
650                                        ma->ma_cookie, 1);
651         RETURN(rc);
652 }
653
654 int mdd_log_op_unlink(struct obd_device *obd,
655                       struct lov_mds_md *lmm, int lmm_size,
656                       struct llog_cookie *logcookies, int cookies_size)
657 {
658         struct mds_obd *mds = &obd->u.mds;
659         struct lov_stripe_md *lsm = NULL;
660         struct llog_unlink_rec *lur;
661         struct llog_ctxt *ctxt;
662         int rc;
663         ENTRY;
664
665         if (IS_ERR(mds->mds_osc_obd))
666                 RETURN(PTR_ERR(mds->mds_osc_obd));
667
668         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
669         if (rc < 0)
670                 RETURN(rc);
671         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
672         if (rc)
673                 GOTO(out, rc);
674         /* first prepare unlink log record */
675         OBD_ALLOC(lur, sizeof(*lur));
676         if (!lur)
677                 GOTO(out, rc = -ENOMEM);
678         lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur);
679         lur->lur_hdr.lrh_type = MDS_UNLINK_REC;
680
681         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
682         rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
683                       cookies_size / sizeof(struct llog_cookie));
684         llog_ctxt_put(ctxt);
685
686         OBD_FREE(lur, sizeof(*lur));
687         GOTO(out, rc);
688 out:
689         obd_free_memmd(mds->mds_osc_exp, &lsm);
690         return rc;
691 }
692
693 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
694                    struct mdd_object *mdd_cobj, struct md_attr *ma)
695 {
696         struct obd_device *obd = mdd2obd_dev(mdd);
697
698         LASSERT(ma->ma_valid & MA_LOV);
699
700         if ((ma->ma_cookie_size > 0) &&
701             (mdd_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size,
702                                ma->ma_cookie, ma->ma_cookie_size) > 0)) {
703                 CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n",
704                        PFID(mdd_object_fid(mdd_cobj)));
705                 ma->ma_valid |= MA_COOKIE;
706         }
707         return 0;
708 }
709
710 int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
711                       struct lov_mds_md *lmm, int lmm_size,
712                       struct llog_cookie *logcookies, int cookies_size)
713 {
714         struct mds_obd *mds = &obd->u.mds;
715         struct lov_stripe_md *lsm = NULL;
716         struct llog_setattr64_rec *lsr;
717         struct llog_ctxt *ctxt;
718         int rc;
719         ENTRY;
720
721         if (IS_ERR(mds->mds_osc_obd))
722                 RETURN(PTR_ERR(mds->mds_osc_obd));
723
724         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
725         if (rc < 0)
726                 RETURN(rc);
727
728         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
729         if (rc)
730                 GOTO(out, rc);
731
732         OBD_ALLOC(lsr, sizeof(*lsr));
733         if (!lsr)
734                 GOTO(out, rc = -ENOMEM);
735
736         /* prepare setattr log record */
737         lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
738         lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC;
739         lsr->lsr_uid = uid;
740         lsr->lsr_gid = gid;
741
742         /* write setattr log */
743         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
744         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
745                       cookies_size / sizeof(struct llog_cookie));
746
747         llog_ctxt_put(ctxt);
748
749         OBD_FREE(lsr, sizeof(*lsr));
750  out:
751         obd_free_memmd(mds->mds_osc_exp, &lsm);
752         RETURN(rc);
753 }
754
755 int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd,
756                     const struct md_attr *ma,
757                     struct lov_mds_md *lmm, int lmm_size,
758                     struct llog_cookie *logcookies, int cookies_size)
759 {
760         struct obd_device *obd = mdd2obd_dev(mdd);
761
762         /* journal chown/chgrp in llog, just like unlink */
763         if (lmm_size > 0) {
764                 CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n",
765                         (unsigned long)ma->ma_attr.la_uid,
766                         (unsigned long)ma->ma_attr.la_gid);
767                 return mdd_log_op_setattr(obd, ma->ma_attr.la_uid,
768                                           ma->ma_attr.la_gid, lmm,
769                                           lmm_size, logcookies,
770                                           cookies_size);
771         } else
772                 return 0;
773 }
774
775 static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
776                           struct lov_mds_md *lmm, int lmm_size,
777                           struct llog_cookie *logcookies, __u64 id, __u32 gen,
778                           struct obd_capa *oc)
779 {
780         struct mds_obd *mds = &obd->u.mds;
781         struct obd_trans_info oti = { 0 };
782         struct obd_info oinfo = { { { 0 } } };
783         int rc;
784         ENTRY;
785
786         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR))
787                 RETURN(0);
788
789         /* first get memory EA */
790         OBDO_ALLOC(oinfo.oi_oa);
791         if (!oinfo.oi_oa)
792                 RETURN(-ENOMEM);
793
794         LASSERT(lmm);
795
796         rc = obd_unpackmd(mds->mds_osc_exp, &oinfo.oi_md, lmm, lmm_size);
797         if (rc < 0) {
798                 CERROR("Error unpack md %p for inode "LPU64"\n", lmm, id);
799                 GOTO(out, rc);
800         }
801
802         rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, oinfo.oi_md);
803         if (rc) {
804                 CERROR("Error revalidate lsm %p \n", oinfo.oi_md);
805                 GOTO(out, rc);
806         }
807
808         /* then fill oa */
809         oinfo.oi_oa->o_uid = uid;
810         oinfo.oi_oa->o_gid = gid;
811         oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id;
812         oinfo.oi_oa->o_gr = oinfo.oi_md->lsm_object_gr;
813         oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
814                                 OBD_MD_FLUID | OBD_MD_FLGID;
815         if (logcookies) {
816                 oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE;
817                 oti.oti_logcookies = logcookies;
818         }
819
820         oinfo.oi_oa->o_fid = id;
821         oinfo.oi_oa->o_generation = gen;
822         oinfo.oi_oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
823         oinfo.oi_capa = oc;
824
825         /* do async setattr from mds to ost not waiting for responses. */
826         rc = obd_setattr_async(mds->mds_osc_exp, &oinfo, &oti, NULL);
827         if (rc)
828                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
829                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
830 out:
831         if (oinfo.oi_md)
832                 obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md);
833         OBDO_FREE(oinfo.oi_oa);
834         RETURN(rc);
835 }
836
837 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
838                           struct lov_mds_md *lmm, int lmm_size,
839                           struct llog_cookie *logcookies)
840 {
841         struct mdd_device   *mdd = mdo2mdd(&obj->mod_obj);
842         struct obd_device   *obd = mdd2obd_dev(mdd);
843         struct lu_attr      *tmp_la = &mdd_env_info(env)->mti_la;
844         const struct lu_fid *fid = mdd_object_fid(obj);
845         int rc = 0;
846         ENTRY;
847
848         mdd_read_lock(env, obj, MOR_TGT_CHILD);
849         rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj));
850         mdd_read_unlock(env, obj);
851         if (rc)
852                 RETURN(rc);
853
854         rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
855                                    lmm_size, logcookies, fid_seq(fid),
856                                    fid_oid(fid), NULL);
857         RETURN(rc);
858 }