Whamcloud - gitweb
7e640c824701686001cf0e3a9a46e5387a84ac11
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
52 #include <obd_lov.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
55
56 #include "mds_internal.h"
57
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
59
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         unsigned int i=0, j;
64
65         CDEBUG(D_INFO, "dump from %s\n", label);
66         if (mds->mds_lov_page_dirty == NULL) {
67                 CERROR("NULL bitmap!\n");
68                 GOTO(skip_bitmap, i);
69         }
70
71         for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i,j,data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107         if (mds->mds_lov_page_dirty == NULL)
108                 RETURN(-ENOMEM);
109
110
111         OBD_ALLOC(mds->mds_lov_page_array, size);
112         if (mds->mds_lov_page_array == NULL)
113                 GOTO(err_free_bitmap, rc = -ENOMEM);
114
115         /* open and test the lov objd file */
116         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
117         if (IS_ERR(file)) {
118                 rc = PTR_ERR(file);
119                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120                 GOTO(err_free, rc = PTR_ERR(file));
121         }
122         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124                        file->f_dentry->d_inode->i_mode);
125                 GOTO(err_open, rc = -ENOENT);
126         }
127         mds->mds_lov_objid_filp = file;
128
129         RETURN (0);
130 err_open:
131         if (filp_close((struct file *)file, 0))
132                 CERROR("can't close %s after error\n", LOV_OBJID);
133 err_free:
134         OBD_FREE(mds->mds_lov_page_array, size);
135 err_free_bitmap:
136         FREE_BITMAP(mds->mds_lov_page_dirty);
137
138         RETURN(rc);
139 }
140 EXPORT_SYMBOL(mds_lov_init_objids);
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
169
170 /**
171  * currently exist two ways for know about ost count and max ost index.
172  * first - after ost is connected to mds and sync process finished
173  * second - get from lmm in recovery process, in case when mds not have configs,
174  * and ost isn't registered in mgs.
175  *
176  * \param mds pointer to mds structure
177  * \param index maxium ost index
178  *
179  * \retval -ENOMEM is not hame memory for new page
180  * \retval 0 is update passed
181  */
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 {
184         __u32 page = index / OBJID_PER_PAGE();
185         __u32 off = index % OBJID_PER_PAGE();
186         obd_id *data =  mds->mds_lov_page_array[page];
187
188         if (data == NULL) {
189                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
190                 if (data == NULL)
191                         RETURN(-ENOMEM);
192
193                 mds->mds_lov_page_array[page] = data;
194         }
195
196         if (index > mds->mds_lov_objid_max_index) {
197                 mds->mds_lov_objid_lastpage = page;
198                 mds->mds_lov_objid_lastidx = off;
199                 mds->mds_lov_objid_max_index = index;
200         }
201
202         /* workaround - New target not in objids file; increase mdsize */
203         /* ld_tgt_count is used as the max index everywhere, despite its name. */
204         if (data[off] == 0) {
205                 __u32 stripes;
206
207                 data[off] = 1;
208                 mds->mds_lov_objid_count++;
209                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210                                 mds->mds_lov_objid_count);
211
212                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215                        "stripes: %d/%d\n", mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize, stripes);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
224 {
225         int rc = 0;
226         __u32 j;
227         struct lov_ost_data_v1 *lmm_objects;
228
229         /* if we create file without objects - lmm is NULL */
230         if (lmm == NULL)
231                 return 0;
232
233         mutex_down(&obd->obd_dev_sem);
234         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
235                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
236         else
237                 lmm_objects = lmm->lmm_objects;
238
239         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
240                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
241                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
242                         rc = -ENOMEM;
243                         break;
244                 }
245         }
246         mutex_up(&obd->obd_dev_sem);
247
248         RETURN(rc);
249 }
250 EXPORT_SYMBOL(mds_lov_prepare_objids);
251
252 /*
253  * write llog orphan record about lost ost object,
254  * Special lsm is allocated with single stripe, caller should deallocated it
255  * after use
256  */
257 static int mds_log_lost_precreated(struct obd_device *obd,
258                                    struct lov_stripe_md **lsmp, int *stripes,
259                                    obd_id id, obd_count count, int idx)
260 {
261         struct lov_stripe_md *lsm = *lsmp;
262         int rc;
263         ENTRY;
264
265         if (*lsmp == NULL) {
266                 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
267                 if (rc < 0)
268                         RETURN(rc);
269                 /* need only one stripe, save old value */
270                 *stripes = lsm->lsm_stripe_count;
271                 lsm->lsm_stripe_count = 1;
272                 *lsmp = lsm;
273         }
274
275         lsm->lsm_oinfo[0]->loi_id = id;
276         lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
277         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
278
279         rc = mds_log_op_orphan(obd, lsm, count);
280         RETURN(rc);
281 }
282
283 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
284 {
285         struct mds_obd *mds = &obd->u.mds;
286         int j;
287         struct lov_ost_data_v1 *lmm_objects;
288 #ifndef HAVE_DELAYED_RECOVERY
289         struct lov_stripe_md *lsm = NULL;
290         int stripes = 0;
291 #endif
292         ENTRY;
293
294         /* if we create file without objects - lmm is NULL */
295         if (lmm == NULL)
296                 return;
297
298         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
299                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
300         else
301                 lmm_objects = lmm->lmm_objects;
302
303         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
304                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
305                 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
306                 __u32 page = i / OBJID_PER_PAGE();
307                 __u32 idx = i % OBJID_PER_PAGE();
308                 obd_id *data;
309
310                 data = mds->mds_lov_page_array[page];
311
312                 CDEBUG(D_INODE,"update last object for ost %u"
313                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
314                 if (id > data[idx]) {
315 #ifndef HAVE_DELAYED_RECOVERY
316                         int lost = id - data[idx] - 1;
317                         /* we might have lost precreated objects due to VBR */
318                         if (lost > 0 && obd->obd_recovering) {
319                                 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
320                                 if (!obd->obd_version_recov)
321                                         CWARN("Unexpected gap in objids\n");
322                                 /* lsm is allocated if NULL */
323                                 mds_log_lost_precreated(obd, &lsm, &stripes,
324                                                         data[idx] + 1, lost, i);
325                         }
326 #endif
327                         data[idx] = id;
328                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
329                 }
330         }
331 #ifndef HAVE_DELAYED_RECOVERY
332         if (lsm) {
333                 /* restore stripes number */
334                 lsm->lsm_stripe_count = stripes;
335                 obd_free_memmd(mds->mds_osc_exp, &lsm);
336         }
337 #endif
338         EXIT;
339         return;
340 }
341 EXPORT_SYMBOL(mds_lov_update_objids);
342
343 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
344                                     __u32 count)
345 {
346         __u32 i;
347         __u32 stripes;
348
349         for(i = 0; i < count; i++) {
350                 if (data[i] == 0)
351                         continue;
352
353                 mds->mds_lov_objid_count++;
354                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
355                                 mds->mds_lov_objid_count);
356
357                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
358                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
359                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
360                        "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
361         }
362         EXIT;
363         return 0;
364 }
365
366 static int mds_lov_read_objids(struct obd_device *obd)
367 {
368         struct mds_obd *mds = &obd->u.mds;
369         loff_t off = 0;
370         int i, rc = 0, count = 0, page = 0;
371         size_t size;
372         ENTRY;
373
374         /* Read everything in the file, even if our current lov desc
375            has fewer targets. Old targets not in the lov descriptor
376            during mds setup may still have valid objids. */
377         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
378         if (size == 0)
379                 RETURN(0);
380
381         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
382         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
383         for (i = 0; i < page; i++) {
384                 obd_id *data;
385                 loff_t off_old = off;
386
387                 LASSERT(mds->mds_lov_page_array[i] == NULL);
388                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
389                 if (mds->mds_lov_page_array[i] == NULL)
390                         GOTO(out, rc = -ENOMEM);
391
392                 data = mds->mds_lov_page_array[i];
393
394                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
395                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
396                 if (rc < 0) {
397                         CERROR("Error reading objids %d\n", rc);
398                         GOTO(out, rc);
399                 }
400
401                 count = (off - off_old) / sizeof(obd_id);
402                 if (mds_lov_update_from_read(mds, data, count)) {
403                         CERROR("Can't update mds data\n");
404                         GOTO(out, rc = -EIO);
405                 }
406
407                 if (off == off_old)
408                         break; /* eof */
409          }
410          mds->mds_lov_objid_lastpage = i;
411          mds->mds_lov_objid_lastidx = count;
412
413         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
414                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
415 out:
416         mds_lov_dump_objids("read",obd);
417
418         RETURN(rc);
419 }
420
421 int mds_lov_write_objids(struct obd_device *obd)
422 {
423         struct mds_obd *mds = &obd->u.mds;
424         int i = 0, rc = 0;
425         ENTRY;
426
427         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
428                 RETURN(0);
429
430         mds_lov_dump_objids("write", obd);
431
432         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
433                 obd_id *data =  mds->mds_lov_page_array[i];
434                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
435                 loff_t off = i * size;
436
437                 LASSERT(data != NULL);
438
439                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
440                         continue;
441
442                 /* check for particaly filled last page */
443                 if (i == mds->mds_lov_objid_lastpage)
444                         size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
445
446                 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
447                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
448                                          size, &off, 0);
449                 if (rc < 0) {
450                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
451                         break;
452                 }
453         }
454         if (rc >= 0)
455                 rc = 0;
456
457         RETURN(rc);
458 }
459 EXPORT_SYMBOL(mds_lov_write_objids);
460
461 static int mds_lov_get_objid(struct obd_device * obd,
462                              obd_id idx)
463 {
464         struct mds_obd *mds = &obd->u.mds;
465         struct obd_export *osc_exp = mds->mds_osc_exp;
466         unsigned int page;
467         unsigned int off;
468         obd_id *data;
469         int rc = 0;
470         ENTRY;
471
472         LASSERT(osc_exp != NULL);
473
474         page = idx / OBJID_PER_PAGE();
475         off = idx % OBJID_PER_PAGE();
476
477         data = mds->mds_lov_page_array[page];
478         if (data[off] < 2 || 
479             !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) {
480                 /* We never read this lastid; ask the osc */
481                 struct obd_id_info lastid;
482                 __u32 size = sizeof(lastid);
483
484                 lastid.idx = idx;
485                 lastid.data = &data[off];
486                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
487                                   KEY_LAST_ID, &size, &lastid, NULL);
488                 if (rc)
489                         GOTO(out, rc);
490
491                 /* workaround for clean filter */
492                 if (data[off] == 0)
493                         data[off] = 1;
494
495                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
496         }
497         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
498                idx, data, page, off, data[off]);
499 out:
500         RETURN(rc);
501 }
502
503 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
504 {
505         int rc;
506         struct obdo oa = { 0 };
507         struct obd_trans_info oti = {0};
508         struct lov_stripe_md  *empty_ea = NULL;
509         ENTRY;
510
511         LASSERT(mds->mds_lov_page_array != NULL);
512
513         /* This create will in fact either create or destroy:  If the OST is
514          * missing objects below this ID, they will be created.  If it finds
515          * objects above this ID, they will be removed. */
516         memset(&oa, 0, sizeof(oa));
517         oa.o_flags = OBD_FL_DELORPHAN;
518         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
519         if (ost_uuid != NULL)
520                 oti.oti_ost_uuid = ost_uuid;
521
522         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
523
524         RETURN(rc);
525 }
526
527 /* for one target */
528 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
529 {
530         struct mds_obd *mds = &obd->u.mds;
531         int rc;
532         struct obd_id_info info;
533         ENTRY;
534
535         LASSERT(!obd->obd_recovering);
536
537         info.idx = idx;
538         info.data = id;
539         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
540                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
541         if (rc)
542                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
543                         obd->obd_name, rc);
544
545         RETURN(rc);
546 }
547
548 /* Update the lov desc for a new size lov. */
549 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
550                                struct obd_uuid *uuid)
551 {
552         struct mds_obd *mds = &obd->u.mds;
553         struct lov_desc *ld;
554         __u32 valsize = sizeof(mds->mds_lov_desc);
555         int rc = 0;
556         ENTRY;
557
558         OBD_ALLOC(ld, sizeof(*ld));
559         if (!ld)
560                 RETURN(-ENOMEM);
561
562         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
563                           &valsize, ld, NULL);
564         if (rc)
565                 GOTO(out, rc);
566
567         /* Don't change the mds_lov_desc until the objids size matches the
568            count (paranoia) */
569         mds->mds_lov_desc = *ld;
570         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
571                mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
572
573         mutex_down(&obd->obd_dev_sem);
574         rc = mds_lov_update_max_ost(mds, index);
575         mutex_up(&obd->obd_dev_sem);
576         if (rc)
577                 GOTO(out, rc = -ENOMEM);
578
579         /* If we added a target we have to reconnect the llogs */
580         /* We only _need_ to do this at first add (idx), or the first time
581            after recovery.  However, it should now be safe to call anytime. */
582         rc = obd_llog_init(obd, obd, (void *)&index); 
583
584 out:
585         OBD_FREE(ld, sizeof(*ld));
586         RETURN(rc);
587 }
588
589
590 /* Inform MDS about new/updated target */
591 static int mds_lov_update_mds(struct obd_device *obd,
592                               struct obd_device *watched,
593                               __u32 idx)
594 {
595         struct mds_obd *mds = &obd->u.mds;
596         __u32 old_count;
597         int rc = 0;
598         int page;
599         int off;
600         obd_id *data;
601
602         ENTRY;
603
604         /* Don't let anyone else mess with mds_lov_objids now */
605         old_count = mds->mds_lov_desc.ld_tgt_count;
606         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
607         if (rc)
608                 GOTO(out, rc);
609
610         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
611                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
612                mds->mds_lov_desc.ld_tgt_count);
613
614         /* idx is set as data from lov_notify. */
615         if (obd->obd_recovering)
616                 GOTO(out, rc);
617
618         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
619                 CERROR("index %d > count %d!\n", idx,
620                        mds->mds_lov_desc.ld_tgt_count);
621                 GOTO(out, rc = -EINVAL);
622         }
623
624         rc = mds_lov_get_objid(obd, idx);
625         if (rc) {
626                 CERROR("Failed to get objid - %d\n", rc);
627                 GOTO(out, rc);
628         }
629
630         page = idx / OBJID_PER_PAGE();
631         off = idx % OBJID_PER_PAGE();
632         data = mds->mds_lov_page_array[page];
633         /* We have read this lastid from disk; tell the osc.
634            Don't call this during recovery. */
635         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
636         if (rc) {
637                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
638                 /* Don't abort the rest of the sync */
639                 rc = 0;
640         }
641
642         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
643                data[off], idx, rc);
644 out:
645         RETURN(rc);
646 }
647
648 /* update the LOV-OSC knowledge of the last used object id's */
649 int mds_lov_connect(struct obd_device *obd, char * lov_name)
650 {
651         struct mds_obd *mds = &obd->u.mds;
652         struct lustre_handle conn = {0,};
653         struct obd_connect_data *data;
654         int rc;
655         ENTRY;
656
657         if (IS_ERR(mds->mds_osc_obd))
658                 RETURN(PTR_ERR(mds->mds_osc_obd));
659
660         if (mds->mds_osc_obd)
661                 RETURN(0);
662
663         mds->mds_osc_obd = class_name2obd(lov_name);
664         if (!mds->mds_osc_obd) {
665                 CERROR("MDS cannot locate LOV %s\n", lov_name);
666                 GOTO(error_exit, rc = -ENOTCONN);
667         }
668
669         mutex_down(&obd->obd_dev_sem);
670         rc = mds_lov_read_objids(obd);
671         mutex_up(&obd->obd_dev_sem);
672         if (rc) {
673                 CERROR("cannot read lov_objids: rc = %d\n", rc);
674                 GOTO(error_exit, rc);
675         }
676
677         /* Deny new client connections until we are sure we have some OSTs */
678         obd->obd_no_conn = 1;
679
680         rc = obd_register_observer(mds->mds_osc_obd, obd);
681         if (rc) {
682                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
683                        lov_name, rc);
684                 GOTO(error_exit, rc);
685         }
686
687         rc = obd_llog_init(obd, obd, NULL); 
688         if (rc) {
689                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
690                        lov_name, rc);
691                 GOTO(error_exit, rc);
692         }
693
694         OBD_ALLOC(data, sizeof(*data));
695         if (data == NULL)
696                 RETURN(-ENOMEM);
697         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
698                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
699                 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
700                 OBD_CONNECT_SKIP_ORPHAN;
701 #ifdef HAVE_LRU_RESIZE_SUPPORT
702         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
703 #endif
704         data->ocd_version = LUSTRE_VERSION_CODE;
705         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
706         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, &mds->mds_osc_exp);
707         OBD_FREE(data, sizeof(*data));
708         if (rc) {
709                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
710                 GOTO(error_exit, rc);
711         }
712         /* we not want postrecov in case clean fs, in other cases postrecov will
713          * be called from ldlm. otherwise we can call postrecov twice - in case
714          * short recovery */
715
716         RETURN(rc);
717
718 error_exit:
719         mds->mds_osc_exp = NULL;
720         mds->mds_osc_obd = ERR_PTR(rc);
721         RETURN(rc);
722 }
723
724 int mds_lov_disconnect(struct obd_device *obd)
725 {
726         struct mds_obd *mds = &obd->u.mds;
727         int rc = 0;
728         ENTRY;
729
730         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
731                 obd_register_observer(mds->mds_osc_obd, NULL);
732
733                 /* The actual disconnect of the mds_lov will be called from
734                  * class_disconnect_exports from mds_lov_clean. So we have to
735                  * ensure that class_cleanup doesn't fail due to the extra ref
736                  * we're holding now. The mechanism to do that already exists -
737                  * the obd_force flag. We'll drop the final ref to the
738                  * mds_osc_exp in mds_cleanup. */
739                 mds->mds_osc_obd->obd_force = 1;
740         }
741
742         RETURN(rc);
743 }
744
745 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
746                   void *karg, void *uarg)
747 {
748         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
749         struct obd_device *obd = exp->exp_obd;
750         struct mds_obd *mds = &obd->u.mds;
751         struct obd_ioctl_data *data = karg;
752         struct lvfs_run_ctxt saved;
753         int rc = 0;
754
755         ENTRY;
756         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
757
758         switch (cmd) {
759         case OBD_IOC_RECORD: {
760                 char *name = data->ioc_inlbuf1;
761                 struct llog_ctxt *ctxt;
762
763                 if (mds->mds_cfg_llh)
764                         RETURN(-EBUSY);
765
766                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
767                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
768                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
769                 llog_ctxt_put(ctxt);
770                 if (rc == 0)
771                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
772                                          &cfg_uuid);
773                 else
774                         mds->mds_cfg_llh = NULL;
775                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
776
777                 RETURN(rc);
778         }
779
780         case OBD_IOC_ENDRECORD: {
781                 if (!mds->mds_cfg_llh)
782                         RETURN(-EBADF);
783
784                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
785                 rc = llog_close(mds->mds_cfg_llh);
786                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
787
788                 mds->mds_cfg_llh = NULL;
789                 RETURN(rc);
790         }
791
792         case OBD_IOC_CLEAR_LOG: {
793                 char *name = data->ioc_inlbuf1;
794                 struct llog_ctxt *ctxt;
795                 if (mds->mds_cfg_llh)
796                         RETURN(-EBUSY);
797
798                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
799                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
800                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
801                 llog_ctxt_put(ctxt);
802                 if (rc == 0) {
803                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
804                                          NULL);
805
806                         rc = llog_destroy(mds->mds_cfg_llh);
807                         llog_free_handle(mds->mds_cfg_llh);
808                 }
809                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
810
811                 mds->mds_cfg_llh = NULL;
812                 RETURN(rc);
813         }
814
815         case OBD_IOC_DORECORD: {
816                 char *cfg_buf;
817                 struct llog_rec_hdr rec;
818                 if (!mds->mds_cfg_llh)
819                         RETURN(-EBADF);
820
821                 rec.lrh_len = llog_data_len(data->ioc_plen1);
822
823                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
824                         rec.lrh_type = OBD_CFG_REC;
825                 } else {
826                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
827                         RETURN(-EINVAL);
828                 }
829
830                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
831                 if (cfg_buf == NULL)
832                         RETURN(-EINVAL);
833                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
834                 if (rc) {
835                         OBD_FREE(cfg_buf, data->ioc_plen1);
836                         RETURN(rc);
837                 }
838
839                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
840                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
841                                     cfg_buf, -1);
842                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
843
844                 OBD_FREE(cfg_buf, data->ioc_plen1);
845                 RETURN(rc);
846         }
847
848         case OBD_IOC_PARSE: {
849                 struct llog_ctxt *ctxt =
850                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
851                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
852                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
853                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
854                 llog_ctxt_put(ctxt);
855                 if (rc)
856                         RETURN(rc);
857
858                 RETURN(rc);
859         }
860
861         case OBD_IOC_DUMP_LOG: {
862                 struct llog_ctxt *ctxt =
863                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
864                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
865                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
866                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
867                 llog_ctxt_put(ctxt);
868                 if (rc)
869                         RETURN(rc);
870
871                 RETURN(rc);
872         }
873
874         case OBD_IOC_SYNC: {
875                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
876                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
877                 RETURN(rc);
878         }
879
880         case OBD_IOC_SET_READONLY: {
881                 void *handle;
882                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
883                 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
884                        obd->obd_name, obd->u.obt.obt_sb->s_id);
885
886                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
887                 if (!IS_ERR(handle))
888                         rc = fsfilt_commit(obd, inode, handle, 1);
889
890                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
891                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
892
893                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
894                 RETURN(0);
895         }
896
897         case OBD_IOC_CATLOGLIST: {
898                 int count = mds->mds_lov_desc.ld_tgt_count;
899                 rc = llog_catalog_list(obd, count, data);
900                 RETURN(rc);
901
902         }
903         case OBD_IOC_LLOG_CHECK:
904         case OBD_IOC_LLOG_CANCEL:
905         case OBD_IOC_LLOG_REMOVE: {
906                 struct llog_ctxt *ctxt =
907                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
908                 int rc2;
909
910                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
911                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
912                 rc = llog_ioctl(ctxt, cmd, data);
913                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
914
915                 rc = obd_llog_init(obd, obd, NULL);
916                 llog_ctxt_put(ctxt);
917                 rc2 = obd_set_info_async(mds->mds_osc_exp,
918                                          sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
919                                          0, NULL, NULL);
920                 if (!rc)
921                         rc = rc2;
922                 RETURN(rc);
923         }
924         case OBD_IOC_LLOG_INFO:
925         case OBD_IOC_LLOG_PRINT: {
926                 struct llog_ctxt *ctxt =
927                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
928
929                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
930                 rc = llog_ioctl(ctxt, cmd, data);
931                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
932                 llog_ctxt_put(ctxt);
933
934                 RETURN(rc);
935         }
936
937         case OBD_IOC_ABORT_RECOVERY:
938                 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
939                 target_abort_recovery(obd);
940                 /* obd_recovering has been changed */
941                 mds_allow_cli(obd, 0);
942                 RETURN(0);
943
944         case OBD_IOC_GET_OBJ_VERSION: {
945                 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
946                 struct dentry *dentry;
947                 struct lustre_handle lockh;
948                 int lockm = LCK_CR;
949                 __u64 version;
950
951                 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
952                                                NULL, 0, MDS_INODELOCK_UPDATE);
953                 if (IS_ERR(dentry)) {
954                         rc = PTR_ERR(dentry);
955                         RETURN(rc);
956                 }
957                 version = fsfilt_get_version(obd, dentry->d_inode);
958                 ldlm_lock_decref(&lockh, lockm);
959                 l_dput(dentry);
960                 if (version < 0)
961                         rc = (int) version;
962                 else
963                         *(__u64 *) data->ioc_inlbuf2 = version;
964                 RETURN(rc);
965         }
966
967         default:
968                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
969                 RETURN(-EINVAL);
970         }
971         RETURN(0);
972
973 }
974
975 /* Collect the preconditions we need to allow client connects */
976 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
977 {
978         if (flag & CONFIG_LOG)
979                 obd->u.mds.mds_fl_cfglog = 1;
980         if (flag & CONFIG_SYNC)
981                 obd->u.mds.mds_fl_synced = 1;
982         if (flag & CONFIG_TARGET)
983                 obd->u.mds.mds_fl_target = 1;
984         if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
985             (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
986                 /* Open for clients */
987                 obd->obd_no_conn = 0;
988 }
989
990 struct mds_lov_sync_info {
991         struct obd_device *mlsi_obd;     /* the lov device to sync */
992         struct obd_device *mlsi_watched; /* target osc */
993         __u32              mlsi_index;   /* index of target */
994 };
995
996 /* We only sync one osc at a time, so that we don't have to hold
997    any kind of lock on the whole mds_lov_desc, which may change
998    (grow) as a result of mds_lov_add_ost.  This also avoids any
999    kind of mismatch between the lov_desc and the mds_lov_desc,
1000    which are not in lock-step during lov_add_obd */
1001 static int __mds_lov_synchronize(void *data)
1002 {
1003         struct mds_lov_sync_info *mlsi = data;
1004         struct obd_device *obd = mlsi->mlsi_obd;
1005         struct obd_device *watched = mlsi->mlsi_watched;
1006         struct mds_obd *mds = &obd->u.mds;
1007         struct obd_uuid *uuid;
1008         __u32  idx = mlsi->mlsi_index;
1009         struct llog_ctxt *ctxt;
1010         int rc = 0;
1011         ENTRY;
1012
1013         OBD_FREE(mlsi, sizeof(*mlsi));
1014
1015         LASSERT(obd);
1016         LASSERT(watched);
1017         uuid = &watched->u.cli.cl_target_uuid;
1018         LASSERT(uuid);
1019
1020         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1021
1022         rc = mds_lov_update_mds(obd, watched, idx);
1023         if (rc != 0) {
1024                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1025                 GOTO(out, rc);
1026         }
1027
1028         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
1029                                 KEY_MDS_CONN, 0, uuid, NULL);
1030         if (rc != 0)
1031                 GOTO(out, rc);
1032
1033         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1034         if (!ctxt)
1035               RETURN(-ENODEV);
1036
1037         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1038
1039         rc = llog_connect(ctxt, NULL, NULL, uuid);
1040         llog_ctxt_put(ctxt);
1041
1042         if (rc != 0) {
1043                 CERROR("%s failed at llog_origin_connect: %d\n",
1044                        obd_uuid2str(uuid), rc);
1045                 GOTO(out, rc);
1046         }
1047
1048         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1049               obd->obd_name, obd_uuid2str(uuid));
1050
1051         if (obd->obd_stopping)
1052                 GOTO(out, rc = -ENODEV);
1053
1054         rc = mds_lov_clear_orphans(mds, uuid);
1055         if (rc != 0) {
1056                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1057                        obd_uuid2str(uuid), rc);
1058                 GOTO(out, rc);
1059         }
1060
1061         EXIT;
1062 out:
1063         if (rc) {
1064                 /* Deactivate it for safety */
1065                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1066                        rc);
1067                 if (!obd->obd_stopping && mds->mds_osc_obd &&
1068                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
1069                         obd_notify(mds->mds_osc_obd, watched,
1070                                    OBD_NOTIFY_INACTIVE, NULL);
1071         } else {
1072                 /* We've successfully synced at least 1 OST and are ready
1073                    to handle client requests */
1074                 mds_allow_cli(obd, CONFIG_SYNC);
1075         }
1076
1077         class_decref(obd);
1078         return rc;
1079 }
1080
1081 int mds_lov_synchronize(void *data)
1082 {
1083         struct mds_lov_sync_info *mlsi = data;
1084         char name[20];
1085
1086         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1087         cfs_daemonize_ctxt(name);
1088
1089         RETURN(__mds_lov_synchronize(data));
1090 }
1091
1092 int mds_lov_start_synchronize(struct obd_device *obd,
1093                               struct obd_device *watched,
1094                               void *data, int nonblock)
1095 {
1096         struct mds_lov_sync_info *mlsi;
1097         int rc;
1098         struct obd_uuid *uuid;
1099         ENTRY;
1100
1101         LASSERT(watched);
1102         uuid = &watched->u.cli.cl_target_uuid;
1103
1104         OBD_ALLOC(mlsi, sizeof(*mlsi));
1105         if (mlsi == NULL)
1106                 RETURN(-ENOMEM);
1107
1108         LASSERT(data);
1109         mlsi->mlsi_obd = obd;
1110         mlsi->mlsi_watched = watched;
1111         mlsi->mlsi_index = *(__u32 *)data;
1112
1113         /* Although class_export_get(obd->obd_self_export) would lock
1114            the MDS in place, since it's only a self-export
1115            it doesn't lock the LOV in place.  The LOV can be disconnected
1116            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1117            Simply taking an export ref on the LOV doesn't help, because it's
1118            still disconnected. Taking an obd reference insures that we don't
1119            disconnect the LOV.  This of course means a cleanup won't
1120            finish for as long as the sync is blocking. */
1121         class_incref(obd);
1122
1123         if (nonblock) {
1124                 /* Synchronize in the background */
1125                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1126                                        CLONE_VM | CLONE_FILES);
1127                 if (rc < 0) {
1128                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1129                                obd->obd_name, rc);
1130                         class_decref(obd);
1131                 } else {
1132                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1133                                "thread=%d\n", obd->obd_name,
1134                                mlsi->mlsi_index, rc);
1135                         rc = 0;
1136                 }
1137         } else {
1138                 rc = __mds_lov_synchronize((void *)mlsi);
1139         }
1140
1141         RETURN(rc);
1142 }
1143
1144 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1145                enum obd_notify_event ev, void *data)
1146 {
1147         int rc = 0;
1148         ENTRY;
1149
1150         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1151
1152         switch (ev) {
1153         /* We only handle these: */
1154         case OBD_NOTIFY_ACTIVE:
1155                 /* lov want one or more _active_ targets for work */
1156                 mds_allow_cli(obd, CONFIG_TARGET);
1157                 /* activate event should be pass lov idx as argument */
1158         case OBD_NOTIFY_SYNC:
1159         case OBD_NOTIFY_SYNC_NONBLOCK:
1160                 /* sync event should be pass lov idx as argument */
1161                 break;
1162         case OBD_NOTIFY_CONFIG:
1163                 mds_allow_cli(obd, (unsigned long)data);
1164                 /* call this only when config is processed and stale_export_age
1165                  * value is configured */
1166                 class_disconnect_expired_exports(obd);
1167                 /* quota_type has been processed, we can now handle
1168                  * incoming quota requests */
1169                 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1170         default:
1171                 RETURN(0);
1172         }
1173
1174         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1175                 CERROR("unexpected notification of %s %s!\n",
1176                        watched->obd_type->typ_name, watched->obd_name);
1177                 RETURN(-EINVAL);
1178         }
1179
1180         if (obd->obd_recovering) {
1181                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1182                       obd->obd_name,
1183                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1184                 /* We still have to fix the lov descriptor for ost's added
1185                    after the mdt in the config log.  They didn't make it into
1186                    mds_lov_connect. */
1187                 LASSERT(data);
1188                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1189                                           &watched->u.cli.cl_target_uuid);
1190
1191                 mds_allow_cli(obd, CONFIG_SYNC);
1192                 RETURN(rc);
1193         }
1194
1195         rc = mds_lov_start_synchronize(obd, watched, data,
1196                                        !(ev == OBD_NOTIFY_SYNC));
1197
1198         if (likely(obd->obd_stopping == 0))
1199                 lquota_recovery(mds_quota_interface_ref, obd);
1200
1201         RETURN(rc);
1202 }
1203
1204 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1205                        int *size)
1206 {
1207         struct lov_desc *ldesc;
1208         ENTRY;
1209
1210         ldesc = &obd->u.mds.mds_lov_desc;
1211         LASSERT(ldesc != NULL);
1212
1213         if (!lmm)
1214                 RETURN(0);
1215
1216         lmm->lmm_magic = LOV_MAGIC_V1;
1217         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1218         lmm->lmm_pattern = ldesc->ld_pattern;
1219         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1220         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1221         *size = sizeof(struct lov_mds_md);
1222
1223         RETURN(sizeof(struct lov_mds_md));
1224 }
1225
1226 /* Convert the on-disk LOV EA structre.
1227  * We always try to convert from an old LOV EA format to the common in-memory
1228  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1229  * then convert back to the new on-disk format and save it back to disk
1230  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1231  * to convert it each time this inode is accessed.
1232  *
1233  * This function is a bit interesting in the error handling.  We can safely
1234  * ship the old lmm to the client in case of failure, since it uses the same
1235  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1236  * reason.  We will not delete the old lmm data until we have written the
1237  * new format lmm data in fsfilt_set_md(). */
1238 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1239                        struct lov_mds_md *lmm, int lmm_size,
1240                        __u64 connect_flags)
1241 {
1242         struct lov_stripe_md *lsm = NULL;
1243         void *handle;
1244         int rc, err;
1245         ENTRY;
1246
1247         if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1248             (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1249                 /* client does not support LOV_MAGIC_V3, so we have to convert
1250                  * it to V1
1251                  * we convert the lmm from v3 to v1
1252                  * and return the new size (which is smaller)
1253                  * the caller supports this way to return the new size */
1254                 int new_lmm_size;
1255
1256                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1257                 /* lmm_stripe_count for non reg files is not used or -1 */
1258                 if (!S_ISREG(inode->i_mode)) {
1259                         new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1260                 } else {
1261                         __u32 lmm_stripe_count;
1262
1263                         lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1264                         new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1265                                                        LOV_MAGIC_V1);
1266                         /* move the objects to the new place */
1267                         memmove(lmm->lmm_objects,
1268                                 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1269                                 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1270                 }
1271                 /* even if new size is smaller than old one,
1272                  * this should not generate memory leak */
1273                 RETURN(new_lmm_size);
1274         }
1275
1276         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1277             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1278             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1279                 RETURN(0);
1280
1281         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1282                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1283                LOV_MAGIC);
1284
1285         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1286         if (rc < 0)
1287                 GOTO(conv_end, rc);
1288
1289         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1290         if (rc < 0)
1291                 GOTO(conv_free, rc);
1292         lmm_size = rc;
1293
1294         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1295         if (IS_ERR(handle)) {
1296                 rc = PTR_ERR(handle);
1297                 GOTO(conv_free, rc);
1298         }
1299
1300         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1301
1302         err = fsfilt_commit(obd, inode, handle, 0);
1303         if (!rc)
1304                 rc = err ? err : lmm_size;
1305         GOTO(conv_free, rc);
1306 conv_free:
1307         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1308 conv_end:
1309         return rc;
1310 }