Whamcloud - gitweb
2f190d3dfef60ec5ce3e7fc7075f95474e032084
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *           wangdi <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <obd_lov.h>
41 #include <lprocfs_status.h>
42
43 #include <lu_object.h>
44 #include <md_object.h>
45 #include <dt_object.h>
46 #include <lustre_mds.h>
47
48 #include "mdd_internal.h"
49
50 static const char mdd_lov_objid_name[] = "lov_objid";
51
52 static int mdd_lov_read_objids(const struct lu_context *ctxt,
53                                struct mdd_device *mdd)
54 {
55         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
56         struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
57         struct lu_attr *lu_attr = &mdd_ctx_info(ctxt)->mti_attr;
58         obd_id *ids;
59         int i, rc;
60         ENTRY;
61
62         LASSERT(!lov_info->mdd_lov_objids_size);
63         LASSERT(!lov_info->mdd_lov_objids_dirty);
64
65         /* Read everything in the file, even if our current lov desc
66            has fewer targets. Old targets not in the lov descriptor
67            during mds setup may still have valid objids. */
68
69         rc = obj_ids->do_ops->do_attr_get(ctxt, obj_ids, lu_attr);
70         if (rc)
71                 GOTO(out, rc);
72
73         if (lu_attr->la_size == 0)
74                 GOTO(out, rc);
75
76         OBD_ALLOC(ids, lu_attr->la_size);
77         if (ids == NULL)
78                 RETURN(-ENOMEM);
79
80         lov_info->mdd_lov_objids = ids;
81         lov_info->mdd_lov_objids_size = lu_attr->la_size;
82
83 #if 0
84         rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids,
85                                             lu_attr->la_size, &off);
86         if (rc < 0) {
87                 CERROR("Error reading objids %d\n", rc);
88                 RETURN(rc);
89         }
90 #endif
91         lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
92
93         for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
94                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95                        lov_info->mdd_lov_objids[i], i);
96         }
97 out:
98         RETURN(0);
99 }
100
101 /* Update the lov desc for a new size lov. */
102 static int mdd_lov_update_desc(const struct lu_context *ctxt,
103                                struct mdd_device *mdd)
104 {
105         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
106         __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc);
107         struct lov_desc *ld;
108         struct obd_device *lov_obd = lov_info->mdd_lov_obd;
109         int rc = 0;
110         ENTRY;
111
112         ld = &mdd_ctx_info(ctxt)->mti_ld;
113
114         rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1,
115                           KEY_LOVDESC, &valsize, ld);
116         if (rc)
117                 GOTO(out, rc);
118
119         /* The size of the LOV target table may have increased. */
120         size = ld->ld_tgt_count * sizeof(obd_id);
121         if ((lov_info->mdd_lov_objids_size == 0) ||
122             (size > lov_info->mdd_lov_objids_size)) {
123                 obd_id *ids;
124
125                 /* add room by powers of 2 */
126                 size = 1;
127                 while (size < ld->ld_tgt_count)
128                         size = size << 1;
129                 size = size * sizeof(obd_id);
130
131                 OBD_ALLOC(ids, size);
132                 if (ids == NULL)
133                         GOTO(out, rc = -ENOMEM);
134                 memset(ids, 0, size);
135                 if (lov_info->mdd_lov_objids_size) {
136                         obd_id *old_ids = lov_info->mdd_lov_objids;
137                         memcpy(ids, lov_info->mdd_lov_objids,
138                                lov_info->mdd_lov_objids_size);
139                         lov_info->mdd_lov_objids = ids;
140                         OBD_FREE(old_ids, lov_info->mdd_lov_objids_size);
141                 }
142                 lov_info->mdd_lov_objids = ids;
143                 lov_info->mdd_lov_objids_size = size;
144         }
145
146         /* Don't change the mds_lov_desc until the objids size matches the
147            count (paranoia) */
148         lov_info->mdd_lov_desc = *ld;
149         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
150                lov_info->mdd_lov_desc.ld_tgt_count);
151
152         stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
153                       max(lov_info->mdd_lov_desc.ld_tgt_count,
154                           lov_info->mdd_lov_objids_in_file));
155         mdd->mdd_max_mdsize = lov_mds_md_size(stripes);
156         mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie);
157         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
158                mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize);
159 out:
160         RETURN(rc);
161 }
162
163 int mdd_lov_write_objids(const struct lu_context *ctxt,
164                          struct mdd_lov_info *lov_info)
165 {
166         int i, rc = 0, tgts;
167         ENTRY;
168
169         if (!lov_info->mdd_lov_objids_dirty)
170                 RETURN(0);
171
172         tgts = max(lov_info->mdd_lov_desc.ld_tgt_count,
173                    lov_info->mdd_lov_objids_in_file);
174         if (!tgts)
175                 RETURN(0);
176
177         for (i = 0; i < tgts; i++)
178                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
179                        lov_info->mdd_lov_objids[i], i);
180 #if 0
181         rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
182                                              lov_info->mdd_lov_objids,
183                                              tgts * sizeof(obd_id), &off);
184         if (rc >= 0) {
185                 lov_info->mdd_lov_objids_dirty = 0;
186                 rc = 0;
187         }
188 #endif
189         RETURN(rc);
190 }
191
192 static int mdd_lov_connect(const struct lu_context *ctxt,
193                            struct mdd_device *mdd, char *lov_name)
194 {
195         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
196         struct lustre_handle conn = {0,};
197         struct obd_connect_data *data;
198         int rc = 0;
199         ENTRY;
200
201         /*connect to obd*/
202         OBD_ALLOC(data, sizeof(*data));
203         if (data == NULL)
204                 RETURN(-ENOMEM);
205         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
206                                   OBD_CONNECT_REQPORTAL;
207         data->ocd_version = LUSTRE_VERSION_CODE;
208         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
209         rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid,
210                          data);
211         OBD_FREE(data, sizeof(*data));
212         if (rc) {
213                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
214                 lov_info->mdd_lov_obd = ERR_PTR(rc);
215                 RETURN(rc);
216         }
217
218         /* open and test the lov objd file */
219
220         rc = mdd_lov_read_objids(ctxt, mdd);
221         if (rc) {
222                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
223                 GOTO(out, rc);
224         }
225
226         rc = mdd_lov_update_desc(ctxt, mdd);
227         if (rc)
228                 GOTO(out, rc);
229 #if 0
230         /* tgt_count may be 0! */
231         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
232         if (rc) {
233                 CERROR("failed to initialize catalog %d\n", rc);
234                 GOTO(err_reg, rc);
235         }
236 #endif
237         /* If we're mounting this code for the first time on an existing FS,
238          * we need to populate the objids array from the real OST values */
239         if (lov_info->mdd_lov_desc.ld_tgt_count >
240                       lov_info->mdd_lov_objids_in_file) {
241                 int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count;
242                 int i;
243
244                 rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export,
245                                   strlen("last_id"), "last_id", &size,
246                                   lov_info->mdd_lov_objids);
247                 if (!rc) {
248                         for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++)
249                                 CWARN("got last object "LPU64" from OST %d\n",
250                                       lov_info->mdd_lov_objids[i], i);
251                         lov_info->mdd_lov_objids_dirty = 1;
252                         rc = mdd_lov_write_objids(ctxt, lov_info);
253                         if (rc)
254                                 CERROR("got last objids from OSTs, but error "
255                                        "writing objids file: %d\n", rc);
256                 }
257         }
258         /* I want to see a callback happen when the OBD moves to a
259          * "For General Use" state, and that's when we'll call
260          * set_nextid().  The class driver can help us here, because
261          * it can use the obd_recovering flag to determine when the
262          * the OBD is full available. */
263 #if 0
264         if (!obd->obd_recovering)
265                 rc = mds_postrecov(obd);
266 #endif
267 out:
268         if (rc)
269                 obd_disconnect(lov_info->mdd_lov_obd->obd_self_export);
270         RETURN(rc);
271 }
272
273 int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd)
274 {
275         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
276
277         dt_object_fini(lov_info->mdd_lov_objid_obj);
278         return 0;
279 }
280
281 int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
282                  struct lustre_cfg *cfg)
283 {
284         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
285         struct dt_object *obj_id;
286         struct obd_device *obd = NULL;
287         char *lov_name = NULL, *srv = NULL;
288         int rc = 0;
289         ENTRY;
290
291         if (IS_ERR(lov_info->mdd_lov_obd))
292                 RETURN(PTR_ERR(lov_info->mdd_lov_obd));
293
294         lov_name = lustre_cfg_string(cfg, 3);
295         LASSERTF(lov_name != NULL, "MDD need lov \n");
296         lov_info->mdd_lov_obd = class_name2obd(lov_name);
297         if (!lov_info->mdd_lov_obd) {
298                 CERROR("MDS cannot locate LOV %s\n", lov_name);
299                 lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN);
300                 RETURN(-ENOTCONN);
301         }
302
303         obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name,
304                                &lov_info->mdd_lov_objid_fid);
305         if (IS_ERR(obj_id)){
306                 rc = PTR_ERR(obj_id);
307                 RETURN(rc);
308         }
309
310         LASSERT(obj_id != NULL);
311         lov_info->mdd_lov_objid_obj = obj_id;
312
313         obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1));
314
315         rc = mdd_lov_connect(ctxt, mdd, lov_name);
316         if (rc)
317                 GOTO(out, rc);
318
319         /*register the obd server for lov*/
320         srv = lustre_cfg_string(cfg, 0);
321         obd = class_name2obd(srv);
322         if (obd == NULL) {
323                 CERROR("No such OBD %s\n", srv);
324                 LBUG();
325         }
326         rc = obd_register_observer(lov_info->mdd_lov_obd, obd);
327         if (rc) {
328                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
329                        lov_name, rc);
330                 GOTO(out, rc);
331         }
332         EXIT;
333 out:
334         if (rc)
335                 mdd_lov_fini(ctxt, mdd);
336         return rc;
337 }
338
339 /* update the LOV-OSC knowledge of the last used object id's */
340 int mdd_lov_set_nextid(struct mdd_device *mdd)
341 {
342         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
343         int rc;
344         ENTRY;
345
346         LASSERT(lov_info->mdd_lov_objids != NULL);
347
348         rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export,
349                                 strlen(KEY_NEXT_ID), KEY_NEXT_ID,
350                                 lov_info->mdd_lov_desc.ld_tgt_count,
351                                 lov_info->mdd_lov_objids, NULL);
352
353         if (rc)
354                 CERROR ("mdd_lov_set_nextid failed (%d)\n", rc);
355
356         RETURN(rc);
357 }
358
359 struct mdd_lov_sync_info {
360         struct lu_context *mlsi_ctxt;
361         struct lu_device  *mlsi_ld;     /* the lov device to sync */
362         struct obd_device *mlsi_watched; /* target osc */
363         __u32              mlsi_index;   /* index of target */
364 };
365
366 #define MDSLOV_NO_INDEX -1
367
368 /* Inform MDS about new/updated target */
369 static int mdd_lov_update_mds(struct lu_context *ctxt,
370                               struct lu_device *ld,
371                               struct obd_device *watched,
372                               __u32 idx)
373 {
374         struct mdd_device *mdd = lu2mdd_dev(ld);
375         struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
376         int old_count;
377         int rc = 0;
378         ENTRY;
379
380         old_count = lov_info->mdd_lov_desc.ld_tgt_count;
381         rc = mdd_lov_update_desc(ctxt, mdd);
382         if (rc)
383                 RETURN(rc);
384
385         /*
386          * idx is set as data from lov_notify.
387          * XXX did not consider recovery here
388          */
389         if (idx != MDSLOV_NO_INDEX) {
390                 if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) {
391                         CERROR("index %d > count %d!\n", idx,
392                                lov_info->mdd_lov_desc.ld_tgt_count);
393                         RETURN(-EINVAL);
394                 }
395
396                 if (idx >= lov_info->mdd_lov_objids_in_file) {
397                         /* We never read this lastid; ask the osc */
398                         obd_id lastid;
399                         __u32 size = sizeof(lastid);
400                         rc = obd_get_info(watched->obd_self_export,
401                                           strlen("last_id"),
402                                           "last_id", &size, &lastid);
403                         if (rc)
404                                 RETURN(rc);
405                         lov_info->mdd_lov_objids[idx] = lastid;
406                         lov_info->mdd_lov_objids_dirty = 1;
407                         mdd_lov_write_objids(ctxt, lov_info);
408                 } else {
409                         /* We have read this lastid from disk; tell the osc.
410                            Don't call this during recovery. */
411                         rc = mdd_lov_set_nextid(mdd);
412                 }
413
414                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
415                       lov_info->mdd_lov_objids[idx], idx);
416         }
417
418         RETURN(rc);
419 }
420
421 static int mdd_lov_clear_orphans(struct mdd_lov_info *mli,
422                                  struct obd_uuid *ost_uuid)
423 {
424         int rc;
425         struct obdo oa;
426         struct obd_trans_info oti = {0};
427         struct lov_stripe_md  *empty_ea = NULL;
428         ENTRY;
429
430         LASSERT(mli->mdd_lov_objids != NULL);
431
432         /* This create will in fact either create or destroy:  If the OST is
433          * missing objects below this ID, they will be created.  If it finds
434          * objects above this ID, they will be removed. */
435         memset(&oa, 0, sizeof(oa));
436         oa.o_valid = OBD_MD_FLFLAGS;
437         oa.o_flags = OBD_FL_DELORPHAN;
438         if (ost_uuid != NULL) {
439                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
440                 oa.o_valid |= OBD_MD_FLINLINE;
441         }
442         rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa,
443                         &empty_ea, &oti);
444
445         RETURN(rc);
446 }
447
448 /* We only sync one osc at a time, so that we don't have to hold
449    any kind of lock on the whole mds_lov_desc, which may change
450    (grow) as a result of mds_lov_add_ost.  This also avoids any
451    kind of mismatch between the lov_desc and the mds_lov_desc,
452    which are not in lock-step during lov_add_obd */
453 static int __mdd_lov_synchronize(void *data)
454 {
455         struct mdd_lov_sync_info *mlsi = data;
456         struct lu_device *ld = mlsi->mlsi_ld;
457         struct obd_device *watched = mlsi->mlsi_watched;
458         struct lu_context *ctxt = mlsi->mlsi_ctxt;
459         struct mdd_device *mdd = lu2mdd_dev(ld);
460         struct obd_uuid *uuid;
461         __u32  idx = mlsi->mlsi_index;
462         int rc = 0;
463         ENTRY;
464
465         OBD_FREE(mlsi, sizeof(*mlsi));
466
467         LASSERT(ld);
468         LASSERT(watched);
469         uuid = &watched->u.cli.cl_target_uuid;
470         LASSERT(uuid);
471
472         rc = mdd_lov_update_mds(ctxt, ld, watched, idx);
473         if (rc != 0)
474                 GOTO(out, rc);
475
476         rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
477                                 strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
478                                 NULL);
479         if (rc != 0) {
480                 CERROR("failed at obd_set_info_async: %d\n", rc);
481                 GOTO(out, rc);
482         }
483
484         rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid);
485         if (rc != 0) {
486                 CERROR("failed at mds_lov_clear_orphans: %d\n", rc);
487                 GOTO(out, rc);
488         }
489 out:
490         EXIT;
491         lu_device_put(ld);
492         return rc;
493 }
494
495 int mdd_lov_synchronize(void *data)
496 {
497         struct mdd_lov_sync_info *mlsi = data;
498         char name[20];
499
500         sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index);
501         ptlrpc_daemonize(name);
502
503         RETURN(__mdd_lov_synchronize(data));
504 }
505
506 int mdd_lov_start_synchronize(const struct lu_context *ctxt,
507                               struct lu_device *ld,
508                               struct obd_device *watched,
509                               void *data, int nonblock)
510 {
511         struct mdd_lov_sync_info *mlsi;
512         int rc;
513
514         ENTRY;
515
516         LASSERT(watched);
517
518         OBD_ALLOC(mlsi, sizeof(*mlsi));
519         if (mlsi == NULL)
520                 RETURN(-ENOMEM);
521
522         mlsi->mlsi_ctxt = (struct lu_context *)ctxt;
523         mlsi->mlsi_ld = ld;
524         mlsi->mlsi_watched = watched;
525         if (data)
526                 mlsi->mlsi_index = *(__u32 *)data;
527         else
528                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
529
530         /* Although class_export_get(obd->obd_self_export) would lock
531            the MDS in place, since it's only a self-export
532            it doesn't lock the LOV in place.  The LOV can be disconnected
533            during MDS precleanup, leaving nothing for __mdd_lov_synchronize.
534            Simply taking an export ref on the LOV doesn't help, because it's
535            still disconnected. Taking an obd reference insures that we don't
536            disconnect the LOV.  This of course means a cleanup won't
537            finish for as long as the sync is blocking. */
538         lu_device_get(ld);
539 #if 0
540         if (nonblock) {
541                 /* Synchronize in the background */
542                 rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi,
543                                        CLONE_VM | CLONE_FILES);
544                 if (rc < 0) {
545                         CERROR("error starting mdd_lov_synchronize: %d\n", rc);
546                         lu_device_put(ld);
547                 } else {
548                         CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n",
549                                mlsi->mlsi_index, rc);
550                         rc = 0;
551                 }
552         } else {
553                 rc = __mdd_lov_synchronize((void *)mlsi);
554         }
555 #else
556         /*FIXME: Did not implement the nonblock lov sync here. because ctxt can not
557          * be shared, maybe we need ref_count for ctxt */
558         rc = __mdd_lov_synchronize((void *)mlsi);
559 #endif
560         RETURN(rc);
561 }
562
563 int mdd_notify(const struct lu_context *ctxt, struct lu_device *ld,
564                struct obd_device *watched, enum obd_notify_event ev,
565                void *data)
566 {
567         struct mdd_device *mdd = lu2mdd_dev(ld);
568         struct obd_device *obd = ld->ld_site->ls_top_dev->ld_obd;
569         int rc = 0;
570         ENTRY;
571
572         switch (ev) {
573         /* We only handle these: */
574         case OBD_NOTIFY_ACTIVE:
575         case OBD_NOTIFY_SYNC:
576         case OBD_NOTIFY_SYNC_NONBLOCK:
577                 break;
578         default:
579                 RETURN(0);
580         }
581
582         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
583
584         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
585                 CERROR("unexpected notification of %s %s!\n",
586                        watched->obd_type->typ_name, watched->obd_name);
587                 RETURN(-EINVAL);
588         }
589
590         /*FIXME later, Recovery stuff still not be designed */
591         if (obd->obd_recovering) {
592                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
593                       obd->obd_name,
594                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
595                 /* We still have to fix the lov descriptor for ost's added
596                    after the mdt in the config log. They didn't make it into
597                    mds_lov_connect. */
598                 rc = mdd_lov_update_desc(ctxt, mdd);
599                 RETURN(rc);
600         }
601
602         rc = mdd_lov_start_synchronize(ctxt, ld, watched, data,
603                                        !(ev == OBD_NOTIFY_SYNC));
604         RETURN(rc);
605 }
606
607 static int mdd_get_md(const struct lu_context *ctxt, struct md_object *obj,
608                       void *md, int *md_size, int lock)
609 {
610         struct dt_object *next;
611         int rc = 0;
612         int lmm_size;
613
614         next = mdd_object_child(md2mdd_obj(obj));
615         rc = next->do_ops->do_xattr_get(ctxt, next, md, *md_size,
616                                         MDS_LOV_MD_NAME);
617         if (rc < 0) {
618                 CERROR("Error %d reading eadata \n", rc);
619         } else if (rc > 0) {
620                 lmm_size = rc;
621                 /*FIXME convert lov EA necessary for this version?*/
622                 *md_size = lmm_size;
623                 rc = lmm_size;
624         } else {
625                 *md_size = 0;
626         }
627
628         RETURN (rc);
629 }
630
631 int mdd_lov_set_md(const struct lu_context *ctxt, struct md_object *pobj,
632                    struct md_object *child)
633 {
634         struct dt_object *next = mdd_object_child(md2mdd_obj(child));
635         int rc = 0;
636         ENTRY;
637
638         if (dt_is_dir(ctxt, next)) {
639                 struct lov_mds_md *lmm = &mdd_ctx_info(ctxt)->mti_lmm;
640                 int lmm_size = sizeof(lmm);
641                 rc = mdd_get_md(ctxt, pobj, &lmm, &lmm_size, 1);
642                 if (rc > 0) {
643                         rc = mdd_xattr_set(ctxt, child, lmm, lmm_size, MDS_LOV_MD_NAME);
644                         if (rc)
645                                 CERROR("error on copy stripe info: rc = %d\n", rc);
646                 }
647         }
648         RETURN(rc);
649 }
650
651 int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
652                    struct mdd_object *child)
653 {
654         struct mdd_lov_info *mli = &mdd->mdd_lov_info;
655         struct obdo *oa;
656         struct lov_mds_md *lmm = NULL;
657         struct lov_stripe_md *lsm = NULL;
658         int rc = 0, lmm_size;
659         ENTRY;
660
661         oa = obdo_alloc();
662
663         oa->o_uid = 0; /* must have 0 uid / gid on OST */
664         oa->o_gid = 0;
665         oa->o_mode = S_IFREG | 0600;
666         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
667                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
668         oa->o_size = 0;
669
670         rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL);
671         if (rc)
672                 GOTO(out_oa, rc);
673
674         rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm);
675         if (rc < 0) {
676                 CERROR("cannot pack lsm, err = %d\n", rc);
677                 GOTO(out_oa, rc);
678         }
679         lmm_size = rc;
680
681         rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size,
682                            MDS_LOV_MD_NAME);
683 out_oa:
684         obdo_free(oa);
685         RETURN(rc);
686 }