Whamcloud - gitweb
LU-365 Update copyright for files modified by Whamcloud
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_lov.c
40  *
41  * Lustre Metadata Server (mds) handling of striped file data
42  *
43  * Author: Peter Braam <braam@clusterfs.com>
44  * Author: wangdi <wangdi@clusterfs.com>
45  */
46
47 #ifndef EXPORT_SYMTAB
48 # define EXPORT_SYMTAB
49 #endif
50 #define DEBUG_SUBSYSTEM S_MDS
51
52 #include <linux/module.h>
53 #include <obd.h>
54 #include <obd_class.h>
55 #include <lustre_ver.h>
56 #include <obd_support.h>
57 #include <obd_lov.h>
58 #include <lprocfs_status.h>
59 #include <lustre_mds.h>
60 #include <lustre_fid.h>
61 #include <lustre/lustre_idl.h>
62
63 #include "mdd_internal.h"
64
65 static int mdd_notify(struct obd_device *host, struct obd_device *watched,
66                       enum obd_notify_event ev, void *owner, void *data)
67 {
68         struct mdd_device *mdd = owner;
69         int rc = 0;
70         ENTRY;
71
72         LASSERT(owner != NULL);
73         switch (ev)
74         {
75                 case OBD_NOTIFY_ACTIVE:
76                 case OBD_NOTIFY_SYNC:
77                 case OBD_NOTIFY_SYNC_NONBLOCK:
78                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
79                                           MD_LOV_SYNC, data);
80                         break;
81                 case OBD_NOTIFY_CONFIG:
82                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
83                                           MD_LOV_CONFIG, data);
84                         break;
85 #ifdef HAVE_QUOTA_SUPPORT
86                 case OBD_NOTIFY_QUOTA:
87                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
88                                           MD_LOV_QUOTA, data);
89                         break;
90 #endif
91                 default:
92                         CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
93         }
94
95         RETURN(rc);
96 }
97
98 /* The obd is created for handling data stack for mdd */
99 int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
100                  struct lustre_cfg *cfg)
101 {
102         char                   *dev = lustre_cfg_string(cfg, 0);
103         int                     rc, name_size, uuid_size;
104         char                   *name, *uuid;
105         __u32                   mds_id;
106         struct lustre_cfg_bufs *bufs;
107         struct lustre_cfg      *lcfg;
108         struct obd_device      *obd;
109         ENTRY;
110
111         mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
112         name_size = strlen(MDD_OBD_NAME) + 35;
113         uuid_size = strlen(MDD_OBD_UUID) + 35;
114
115         OBD_ALLOC(name, name_size);
116         OBD_ALLOC(uuid, uuid_size);
117         if (name == NULL || uuid == NULL)
118                 GOTO(cleanup_mem, rc = -ENOMEM);
119
120         OBD_ALLOC_PTR(bufs);
121         if (!bufs)
122                 GOTO(cleanup_mem, rc = -ENOMEM);
123
124         snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s",
125                  MDD_OBD_NAME, dev);
126
127         snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s",
128                  MDD_OBD_UUID, dev);
129
130         lustre_cfg_bufs_reset(bufs, name);
131         lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE);
132         lustre_cfg_bufs_set_string(bufs, 2, uuid);
133         lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */);
134         lustre_cfg_bufs_set_string(bufs, 4, (char*)dev);
135
136         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
137         OBD_FREE_PTR(bufs);
138         if (!lcfg)
139                 GOTO(cleanup_mem, rc = -ENOMEM);
140
141         rc = class_attach(lcfg);
142         if (rc)
143                 GOTO(lcfg_cleanup, rc);
144
145         obd = class_name2obd(name);
146         if (!obd) {
147                 CERROR("Can not find obd %s\n", MDD_OBD_NAME);
148                 LBUG();
149         }
150
151         cfs_spin_lock(&obd->obd_dev_lock);
152         obd->obd_recovering = 1;
153         cfs_spin_unlock(&obd->obd_dev_lock);
154         obd->u.mds.mds_id = mds_id;
155         rc = class_setup(obd, lcfg);
156         if (rc)
157                 GOTO(class_detach, rc);
158
159         /*
160          * Add here for obd notify mechanism, when adding a new ost, the mds
161          * will notify this mdd. The mds will be used for quota also.
162          */
163         obd->obd_upcall.onu_upcall = mdd_notify;
164         obd->obd_upcall.onu_owner = mdd;
165         mdd->mdd_obd_dev = obd;
166         EXIT;
167 class_detach:
168         if (rc)
169                 class_detach(obd, lcfg);
170 lcfg_cleanup:
171         lustre_cfg_free(lcfg);
172 cleanup_mem:
173         if (name)
174                 OBD_FREE(name, name_size);
175         if (uuid)
176                 OBD_FREE(uuid, uuid_size);
177         return rc;
178 }
179
180 int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd,
181                  struct lustre_cfg *lcfg)
182 {
183         struct obd_device      *obd;
184         int rc;
185         ENTRY;
186
187         obd = mdd2obd_dev(mdd);
188         LASSERT(obd);
189
190         rc = class_cleanup(obd, lcfg);
191         if (rc)
192                 GOTO(lcfg_cleanup, rc);
193
194         obd->obd_upcall.onu_upcall = NULL;
195         obd->obd_upcall.onu_owner = NULL;
196         rc = class_detach(obd, lcfg);
197         if (rc)
198                 GOTO(lcfg_cleanup, rc);
199         mdd->mdd_obd_dev = NULL;
200
201         EXIT;
202 lcfg_cleanup:
203         return rc;
204 }
205
206 int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
207                void *md, int *md_size, const char *name)
208 {
209         int rc;
210         ENTRY;
211
212         rc = mdo_xattr_get(env, obj, mdd_buf_get(env, md, *md_size), name,
213                            mdd_object_capa(env, obj));
214         /*
215          * XXX: Handling of -ENODATA, the right way is to have ->do_md_get()
216          * exported by dt layer.
217          */
218         if (rc == 0 || rc == -ENODATA) {
219                 *md_size = 0;
220                 rc = 0;
221         } else if (rc < 0) {
222                 CERROR("Error %d reading eadata - %d\n", rc, *md_size);
223         } else {
224                 /* XXX: Convert lov EA but fixed after verification test. */
225                 *md_size = rc;
226         }
227
228         RETURN(rc);
229 }
230
231 int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
232                       void *md, int *md_size, const char *name)
233 {
234         int rc = 0;
235         mdd_read_lock(env, obj, MOR_TGT_CHILD);
236         rc = mdd_get_md(env, obj, md, md_size, name);
237         mdd_read_unlock(env, obj);
238         return rc;
239 }
240
241 static int mdd_lov_set_stripe_md(const struct lu_env *env,
242                                  struct mdd_object *obj, struct lu_buf *buf,
243                                  struct thandle *handle)
244 {
245         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
246         struct obd_device       *obd = mdd2obd_dev(mdd);
247         struct obd_export       *lov_exp = obd->u.mds.mds_lov_exp;
248         struct lov_stripe_md    *lsm = NULL;
249         int rc;
250         ENTRY;
251
252         LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
253         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0,
254                            &lsm, buf->lb_buf);
255         if (rc)
256                 RETURN(rc);
257         obd_free_memmd(lov_exp, &lsm);
258
259         rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle);
260
261         CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
262         RETURN(rc);
263 }
264
265 /*
266  * Permission check is done before call it,
267  * no need check again.
268  */
269 static int mdd_lov_set_dir_md(const struct lu_env *env,
270                               struct mdd_object *obj, struct lu_buf *buf,
271                               struct thandle *handle)
272 {
273         struct lov_user_md *lum = NULL;
274         int rc = 0;
275         ENTRY;
276
277         LASSERT(S_ISDIR(mdd_object_type(obj)));
278         lum = (struct lov_user_md*)buf->lb_buf;
279
280         /* if { size, offset, count } = { 0, -1, 0 } and no pool (i.e. all default
281          * values specified) then delete default striping from dir. */
282         if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count,
283                                 lum->lmm_stripe_offset) &&
284             lum->lmm_magic != LOV_USER_MAGIC_V3) {
285                 rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL,
286                                        XATTR_NAME_LOV, 0, handle);
287                 if (rc == -ENODATA)
288                         rc = 0;
289                 CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
290                                 PFID(mdo2fid(obj)), rc);
291         } else {
292                 rc = mdd_lov_set_stripe_md(env, obj, buf, handle);
293         }
294         RETURN(rc);
295 }
296
297 int mdd_lsm_sanity_check(const struct lu_env *env,  struct mdd_object *obj)
298 {
299         struct lu_attr   *tmp_la = &mdd_env_info(env)->mti_la;
300         struct md_ucred  *uc     = md_ucred(env);
301         int rc;
302         ENTRY;
303
304         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
305         if (rc)
306                 RETURN(rc);
307
308         if ((uc->mu_fsuid != tmp_la->la_uid) &&
309             !mdd_capable(uc, CFS_CAP_FOWNER))
310                 rc = mdd_permission_internal_locked(env, obj, tmp_la,
311                                                     MAY_WRITE, MOR_TGT_CHILD);
312
313         RETURN(rc);
314 }
315
316 int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
317                    struct mdd_object *child, struct lov_mds_md *lmmp,
318                    int lmm_size, struct thandle *handle, int set_stripe)
319 {
320         struct lu_buf *buf;
321         cfs_umode_t mode;
322         int rc = 0;
323         ENTRY;
324
325         buf = mdd_buf_get(env, lmmp, lmm_size);
326         mode = mdd_object_type(child);
327         if (S_ISREG(mode) && lmm_size > 0) {
328                 if (set_stripe) {
329                         rc = mdd_lov_set_stripe_md(env, child, buf, handle);
330                 } else {
331                         rc = mdd_xattr_set_txn(env, child, buf,
332                                                XATTR_NAME_LOV, 0, handle);
333                 }
334         } else if (S_ISDIR(mode)) {
335                 if (lmmp == NULL && lmm_size == 0) {
336                         struct mdd_device *mdd = mdd_obj2mdd_dev(child);
337                         struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd);
338                         int size = sizeof(struct lov_mds_md_v3);
339
340                         /* Get parent dir stripe and set */
341                         if (pobj != NULL)
342                                 rc = mdd_get_md_locked(env, pobj, lmm, &size,
343                                                        XATTR_NAME_LOV);
344                         if (rc > 0) {
345                                 buf = mdd_buf_get(env, lmm, size);
346                                 rc = mdd_xattr_set_txn(env, child, buf,
347                                                XATTR_NAME_LOV, 0, handle);
348                                 if (rc)
349                                         CERROR("error on copy stripe info: rc "
350                                                 "= %d\n", rc);
351                         }
352                 } else {
353                         LASSERT(lmmp != NULL && lmm_size > 0);
354                         rc = mdd_lov_set_dir_md(env, child, buf, handle);
355                 }
356         }
357         CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n",
358                         lmmp, lmm_size, PFID(mdo2fid(child)), rc);
359         RETURN(rc);
360 }
361
362 int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
363 {
364         /* copy mds_lov code is using wrong layer */
365         return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
366 }
367
368 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
369 {
370         /* copy mds_lov code is using wrong layer */
371         mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
372 }
373
374 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
375                            struct lov_mds_md *lmm, int lmm_size,
376                            const struct md_op_spec *spec)
377 {
378         if (lmm && !spec->no_create)
379                 OBD_FREE(lmm, lmm_size);
380 }
381
382 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
383                    struct mdd_object *parent, struct mdd_object *child,
384                    struct lov_mds_md **lmm, int *lmm_size,
385                    const struct md_op_spec *spec, struct lu_attr *la)
386 {
387         struct obd_device     *obd = mdd2obd_dev(mdd);
388         struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
389         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
390         struct obdo           *oa;
391         struct lov_stripe_md  *lsm = NULL;
392         const void            *eadata = spec->u.sp_ea.eadata;
393         __u64                  create_flags = spec->sp_cr_flags;
394         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
395         int                    rc = 0;
396         ENTRY;
397
398         if (!md_should_create(create_flags)) {
399                 *lmm_size = 0;
400                 RETURN(0);
401         }
402         oti_init(oti, NULL);
403
404         /* replay case, has objects already, only get lov from eadata */
405         if (spec->no_create != 0) {
406                 *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
407                 *lmm_size = spec->u.sp_ea.eadatalen;
408                 if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count,
409                                                  (*lmm)->lmm_magic)) {
410                         RETURN(0);
411                 } else {
412                         CERROR("incorrect lsm received during recovery\n");
413                         RETURN(-EPROTO);
414                 }
415         }
416
417         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO))
418                 GOTO(out_ids, rc = -ENOMEM);
419
420         LASSERT(lov_exp != NULL);
421         oa = &mdd_env_info(env)->mti_oa;
422
423         oa->o_uid = 0; /* must have 0 uid / gid on OST */
424         oa->o_gid = 0;
425         oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
426         oa->o_mode = S_IFREG | 0600;
427         oa->o_id = fid_ver_oid(mdd_object_fid(child));
428         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
429                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
430         oa->o_size = 0;
431
432         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
433                 if (create_flags & MDS_OPEN_HAS_EA) {
434                         LASSERT(eadata != NULL);
435                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
436                                            0, &lsm, (void*)eadata);
437                         if (rc)
438                                 GOTO(out_oti, rc);
439                 } else if (parent != NULL) {
440                         /* get lov ea from parent and set to lov */
441                         struct lov_mds_md *_lmm;
442                         int _lmm_size;
443
444                         _lmm_size = mdd_lov_mdsize(env, mdd);
445                         _lmm = mdd_max_lmm_get(env, mdd);
446
447                         if (_lmm == NULL)
448                                 GOTO(out_oti, rc = -ENOMEM);
449
450                         rc = mdd_get_md_locked(env, parent, _lmm,
451                                                &_lmm_size,
452                                                XATTR_NAME_LOV);
453                         if (rc > 0)
454                                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
455                                                    lov_exp, *lmm_size,
456                                                    &lsm, _lmm);
457
458                         if (rc)
459                                 GOTO(out_oti, rc);
460                 }
461
462                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10);
463                 rc = obd_create(lov_exp, oa, &lsm, oti);
464                 if (rc) {
465                         if (rc > 0) {
466                                 CERROR("Create error for "DFID": %d\n",
467                                        PFID(mdo2fid(child)), rc);
468                                 rc = -EIO;
469                         }
470                         GOTO(out_oti, rc);
471                 }
472                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
473         } else {
474                 LASSERT(eadata != NULL);
475                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
476                                    (void*)eadata);
477                 if (rc)
478                         GOTO(out_oti, rc);
479
480         }
481
482         lsm->lsm_object_id = fid_ver_oid(mdd_object_fid(child));
483         lsm->lsm_object_seq = fid_seq(mdd_object_fid(child));
484         /*
485          * Sometimes, we may truncate some object(without lsm) then open it
486          * (with write flags), so creating lsm above.  The Nonzero(truncated)
487          * size should tell ost, since size attr is in charge by OST.
488          */
489         if (la->la_size && la->la_valid & LA_SIZE) {
490                 struct obd_info *oinfo = &mdd_env_info(env)->mti_oi;
491
492                 memset(oinfo, 0, sizeof(*oinfo));
493
494                 /* When setting attr to ost, FLBKSZ is not needed. */
495                 oa->o_valid &= ~OBD_MD_FLBLKSZ;
496                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
497                              OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
498
499                 /*
500                  * XXX: Pack lustre id to OST, in OST, it will be packed by
501                  * filter_fid, but can not see what is the usages. So just pack
502                  * o_seq o_ver here, maybe fix it after this cycle.
503                  */
504                 obdo_from_inode(oa, NULL,
505                                 (struct lu_fid *)mdd_object_fid(child), 0);
506                 oinfo->oi_oa = oa;
507                 oinfo->oi_md = lsm;
508                 oinfo->oi_capa = NULL;
509                 oinfo->oi_policy.l_extent.start = la->la_size;
510                 oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF;
511
512                 rc = obd_punch_rqset(lov_exp, oinfo, oti);
513                 if (rc) {
514                         CERROR("Error setting attrs for "DFID": rc %d\n",
515                                PFID(mdo2fid(child)), rc);
516                         if (rc > 0) {
517                                 CERROR("obd_setattr for "DFID" rc %d\n",
518                                         PFID(mdo2fid(child)), rc);
519                                 rc = -EIO;
520                         }
521                         GOTO(out_oti, rc);
522                 }
523         }
524         /* blksize should be changed after create data object */
525         la->la_valid |= LA_BLKSIZE;
526         la->la_blksize = oa->o_blksize;
527         *lmm = NULL;
528         rc = obd_packmd(lov_exp, lmm, lsm);
529         if (rc < 0) {
530                 CERROR("Cannot pack lsm, err = %d\n", rc);
531                 GOTO(out_oti, rc);
532         }
533         if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
534                 CERROR("Not have memory for update objid\n");
535                 OBD_FREE(*lmm, rc);
536                 *lmm = NULL;
537                 GOTO(out_oti, rc = -ENOMEM);
538         }
539         *lmm_size = rc;
540         rc = 0;
541         EXIT;
542 out_oti:
543         oti_free_cookies(oti);
544 out_ids:
545         if (lsm)
546                 obd_free_memmd(lov_exp, &lsm);
547
548         return rc;
549 }
550
551 /*
552  * used when destroying orphans and from mds_reint_unlink() when MDS wants to
553  * destroy objects on OSS.
554  */
555 static
556 int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
557                       struct mdd_object *obj, struct lu_attr *la,
558                       struct lov_mds_md *lmm, int lmm_size,
559                       struct llog_cookie *logcookies,
560                       int log_unlink)
561 {
562         struct obd_device     *obd = mdd2obd_dev(mdd);
563         struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
564         struct lov_stripe_md  *lsm = NULL;
565         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
566         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
567         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
568         int rc;
569         ENTRY;
570
571         if (lmm_size == 0)
572                 RETURN(0);
573
574         rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
575         if (rc < 0) {
576                 CERROR("Error unpack md %p\n", lmm);
577                 RETURN(rc);
578         } else {
579                 LASSERT(rc >= sizeof(*lsm));
580                 rc = 0;
581         }
582
583         oa->o_id = lsm->lsm_object_id;
584         oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
585         oa->o_mode = la->la_mode & S_IFMT;
586         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
587
588         oti_init(oti, NULL);
589         if (log_unlink && logcookies) {
590                 oa->o_valid |= OBD_MD_FLCOOKIE;
591                 oti->oti_logcookies = logcookies;
592         }
593
594         CDEBUG(D_INFO, "destroying OSS object "LPU64":"LPU64"\n", oa->o_seq,
595                oa->o_id);
596
597         rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, NULL);
598
599         obd_free_memmd(lov_exp, &lsm);
600         RETURN(rc);
601 }
602
603 /*
604  * called with obj locked. 
605  */
606 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
607                     struct mdd_object *obj, struct lu_attr *la)
608 {
609         struct md_attr    *ma = &mdd_env_info(env)->mti_ma;
610         int                rc;
611         ENTRY;
612
613         LASSERT(mdd_write_locked(env, obj) != 0);
614
615         if (unlikely(!S_ISREG(mdd_object_type(obj))))
616                 RETURN(0);
617
618         if (unlikely(la->la_nlink != 0)) {
619                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
620                       la->la_nlink);
621                 RETURN(0);
622         }
623
624         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
625         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
626         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
627         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
628         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
629                 RETURN(rc = -ENOMEM);
630
631         /* get lov ea */
632
633         rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
634                         XATTR_NAME_LOV);
635
636         if (rc <= 0) {
637                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
638                          PFID(mdo2fid(obj)), rc);
639                 if (rc == 0)
640                         rc = -ENOENT;
641                 RETURN(rc);
642         }
643
644         ma->ma_valid = MA_LOV;
645
646         rc = mdd_unlink_log(env, mdd, obj, ma);
647         if (rc) {
648                 CWARN("mds unlink log for "DFID" failed: %d\n",
649                        PFID(mdo2fid(obj)), rc);
650                 RETURN(rc);
651         }
652
653         if (ma->ma_valid & MA_COOKIE)
654                 rc = mdd_lovobj_unlink(env, mdd, obj, la,
655                                        ma->ma_lmm, ma->ma_lmm_size,
656                                        ma->ma_cookie, 1);
657         RETURN(rc);
658 }
659
660 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
661                    struct mdd_object *mdd_cobj, struct md_attr *ma)
662 {
663         LASSERT(ma->ma_valid & MA_LOV);
664
665         if ((ma->ma_cookie_size > 0) &&
666             (mds_log_op_unlink(mdd2obd_dev(mdd), ma->ma_lmm, ma->ma_lmm_size,
667                                ma->ma_cookie, ma->ma_cookie_size) > 0)) {
668                 CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n",
669                        PFID(mdd_object_fid(mdd_cobj)));
670                 ma->ma_valid |= MA_COOKIE;
671         }
672         return 0;
673 }
674
675 int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
676                        struct lov_mds_md *lmm, int lmm_size,
677                        struct llog_cookie *logcookies, int cookies_size)
678 {
679         struct mds_obd *mds = &obd->u.mds;
680         struct lov_stripe_md *lsm = NULL;
681         struct llog_setattr64_rec *lsr;
682         struct llog_ctxt *ctxt;
683         int rc;
684         ENTRY;
685
686         if (IS_ERR(mds->mds_lov_obd))
687                 RETURN(PTR_ERR(mds->mds_lov_obd));
688
689         rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
690         if (rc < 0)
691                 RETURN(rc);
692
693         OBD_ALLOC(lsr, sizeof(*lsr));
694         if (!lsr)
695                 GOTO(out, rc = -ENOMEM);
696
697         /* prepare setattr log record */
698         lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
699         lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC;
700         lsr->lsr_uid = uid;
701         lsr->lsr_gid = gid;
702
703         /* write setattr log */
704         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
705         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
706                       cookies_size / sizeof(struct llog_cookie));
707
708         llog_ctxt_put(ctxt);
709
710         OBD_FREE(lsr, sizeof(*lsr));
711  out:
712         obd_free_memmd(mds->mds_lov_exp, &lsm);
713         RETURN(rc);
714 }
715
716 int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd,
717                     const struct md_attr *ma,
718                     struct lov_mds_md *lmm, int lmm_size,
719                     struct llog_cookie *logcookies, int cookies_size)
720 {
721         struct obd_device *obd = mdd2obd_dev(mdd);
722
723         /* journal chown/chgrp in llog, just like unlink */
724         if (lmm_size > 0) {
725                 CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n",
726                         (unsigned long)ma->ma_attr.la_uid,
727                         (unsigned long)ma->ma_attr.la_gid);
728                 return mdd_log_op_setattr(obd, ma->ma_attr.la_uid,
729                                           ma->ma_attr.la_gid, lmm,
730                                           lmm_size, logcookies,
731                                           cookies_size);
732         } else
733                 return 0;
734 }
735
736 static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
737                           struct lov_mds_md *lmm, int lmm_size,
738                           struct llog_cookie *logcookies, const struct lu_fid *parent,
739                           struct obd_capa *oc)
740 {
741         struct mds_obd *mds = &obd->u.mds;
742         struct obd_trans_info oti = { 0 };
743         struct obd_info oinfo = { { { 0 } } };
744         int rc;
745         ENTRY;
746
747         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR))
748                 RETURN(0);
749
750         /* first get memory EA */
751         OBDO_ALLOC(oinfo.oi_oa);
752         if (!oinfo.oi_oa)
753                 RETURN(-ENOMEM);
754
755         LASSERT(lmm);
756
757         rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size);
758         if (rc < 0) {
759                 CERROR("Error unpack md %p for obj "DFID"\n", lmm,
760                         PFID(parent));
761                 GOTO(out, rc);
762         }
763
764         /* then fill oa */
765         oinfo.oi_oa->o_uid = uid;
766         oinfo.oi_oa->o_gid = gid;
767         oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id;
768         oinfo.oi_oa->o_seq = oinfo.oi_md->lsm_object_seq;
769         oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
770                                 OBD_MD_FLUID | OBD_MD_FLGID;
771         if (logcookies) {
772                 oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE;
773                 oti.oti_logcookies = logcookies;
774         }
775
776         obdo_from_inode(oinfo.oi_oa, NULL, (struct lu_fid *)parent, 0);
777         oinfo.oi_capa = oc;
778
779         /* do async setattr from mds to ost not waiting for responses. */
780         rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL);
781         if (rc)
782                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
783                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
784 out:
785         if (oinfo.oi_md)
786                 obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md);
787         OBDO_FREE(oinfo.oi_oa);
788         RETURN(rc);
789 }
790
791 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
792                           struct lov_mds_md *lmm, int lmm_size,
793                           struct llog_cookie *logcookies)
794 {
795         struct mdd_device   *mdd = mdo2mdd(&obj->mod_obj);
796         struct obd_device   *obd = mdd2obd_dev(mdd);
797         struct lu_attr      *tmp_la = &mdd_env_info(env)->mti_la;
798         const struct lu_fid *fid = mdd_object_fid(obj);
799         int rc = 0;
800         ENTRY;
801
802         mdd_read_lock(env, obj, MOR_TGT_CHILD);
803         rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj));
804         mdd_read_unlock(env, obj);
805         if (rc)
806                 RETURN(rc);
807
808         rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
809                                    lmm_size, logcookies, fid, NULL);
810         RETURN(rc);
811 }
812
813 static int grouplock_blocking_ast(struct ldlm_lock *lock,
814                                   struct ldlm_lock_desc *desc,
815                                   void *data, int flag)
816 {
817         struct md_attr *ma = data;
818         struct lustre_handle lockh;
819         int rc = 0;
820         ENTRY;
821
822         switch (flag)
823         {
824                 case LDLM_CB_BLOCKING :
825                         /* lock is canceled */
826                         CDEBUG(D_DLMTRACE, "Lock %p is canceled\n", lock);
827
828                         ldlm_lock2handle(lock, &lockh);
829                         rc = ldlm_cli_cancel(&lockh);
830
831                         break;
832                 case LDLM_CB_CANCELING :
833                         CDEBUG(D_DLMTRACE,
834                                "Lock %p has been canceled, do cleaning\n",
835                                lock);
836
837                         if (ma && ma->ma_som)
838                                 OBD_FREE_PTR(ma->ma_som);
839                         if (ma)
840                                 OBD_FREE_PTR(ma);
841                         break;
842                 default:
843                         LBUG();
844         }
845         RETURN(rc);
846 }
847
848 static int grouplock_glimpse_ast(struct ldlm_lock *lock, void *data)
849 {
850         struct ptlrpc_request *req = data;
851         struct ost_lvb *lvb;
852         int rc;
853         struct md_attr *ma;
854         ENTRY;
855
856         ma = lock->l_ast_data;
857
858         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
859         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
860                              sizeof(*lvb));
861         rc = req_capsule_server_pack(&req->rq_pill);
862         if (rc) {
863                 CERROR("failed pack reply: %d\n", rc);
864                 GOTO(out, rc);
865         }
866
867         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
868
869         if ((ma) && (ma->ma_valid & MA_SOM)) {
870                 lvb->lvb_size = ma->ma_som->msd_size;
871                 lvb->lvb_blocks = ma->ma_som->msd_blocks;
872         } else if ((ma) && (ma->ma_valid & MA_INODE)) {
873                 lvb->lvb_size = ma->ma_attr.la_size;
874                 lvb->lvb_blocks = ma->ma_attr.la_blocks;
875         } else {
876                 lvb->lvb_size = 0;
877                 rc = -ELDLM_NO_LOCK_DATA;
878         }
879
880         EXIT;
881 out:
882         if (rc == -ELDLM_NO_LOCK_DATA)
883                 lustre_pack_reply(req, 1, NULL, NULL);
884
885         req->rq_status = rc;
886         return rc;
887 }
888
889 int mdd_file_lock(const struct lu_env *env, struct md_object *obj,
890                   struct lov_mds_md *lmm, struct ldlm_extent *extent,
891                   struct lustre_handle *lockh)
892 {
893         struct ldlm_enqueue_info einfo = { 0 };
894         struct obd_info oinfo = { { { 0 } } };
895         struct obd_device *obd;
896         struct obd_export *lov_exp;
897         struct lov_stripe_md *lsm = NULL;
898         struct md_attr *ma = NULL;
899         int rc;
900         ENTRY;
901
902         obd = mdo2mdd(obj)->mdd_obd_dev;
903         lov_exp = obd->u.mds.mds_lov_exp;
904
905         obd_unpackmd(lov_exp, &lsm, lmm,
906                      lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic));
907
908         OBD_ALLOC_PTR(ma);
909         if (ma == NULL)
910                 GOTO(out, rc = -ENOMEM);
911
912         OBD_ALLOC_PTR(ma->ma_som);
913         if (ma->ma_som == NULL)
914                 GOTO(out, rc = -ENOMEM);
915
916         ma->ma_need = MA_SOM | MA_INODE;
917         mo_attr_get(env, obj, ma);
918
919         einfo.ei_type = LDLM_EXTENT;
920         einfo.ei_mode = LCK_GROUP;
921         einfo.ei_cb_bl = grouplock_blocking_ast;
922         einfo.ei_cb_cp = ldlm_completion_ast;
923         einfo.ei_cb_gl = grouplock_glimpse_ast;
924
925         if (ma->ma_valid & (MA_SOM | MA_INODE))
926                 einfo.ei_cbdata = ma;
927         else
928                 einfo.ei_cbdata = NULL;
929
930         memset(&oinfo.oi_policy, 0, sizeof(oinfo.oi_policy));
931         oinfo.oi_policy.l_extent = *extent;
932         oinfo.oi_lockh = lockh;
933         oinfo.oi_md = lsm;
934         oinfo.oi_flags = 0;
935
936         rc = obd_enqueue(lov_exp, &oinfo, &einfo, NULL);
937         /* ei_cbdata is used as a free flag at exit */
938         if (rc)
939                 einfo.ei_cbdata = NULL;
940
941         obd_unpackmd(lov_exp, &lsm, NULL, 0);
942
943 out:
944         /* ma is freed if not used as callback data */
945         if ((einfo.ei_cbdata == NULL) && ma && ma->ma_som)
946                 OBD_FREE_PTR(ma->ma_som);
947         if ((einfo.ei_cbdata == NULL) && ma)
948                 OBD_FREE_PTR(ma);
949
950         RETURN(rc);
951 }
952
953 int mdd_file_unlock(const struct lu_env *env, struct md_object *obj,
954                     struct lov_mds_md *lmm, struct lustre_handle *lockh)
955 {
956         struct obd_device *obd;
957         struct obd_export *lov_exp;
958         struct lov_stripe_md *lsm = NULL;
959         int rc;
960         ENTRY;
961
962         LASSERT(lustre_handle_is_used(lockh));
963
964         obd = mdo2mdd(obj)->mdd_obd_dev;
965         lov_exp = obd->u.mds.mds_lov_exp;
966
967         obd_unpackmd(lov_exp, &lsm, lmm,
968                      lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic));
969
970         rc = obd_cancel(lov_exp, lsm, LCK_GROUP, lockh);
971
972         obd_unpackmd(lov_exp, &lsm, NULL, 0);
973
974         RETURN(rc);
975 }