Whamcloud - gitweb
1aefeed370ea067be7e8a0e99ea1bb450d6a04f0
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
52 #include <obd_lov.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
55
56 #include "mds_internal.h"
57
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
59
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         unsigned int i=0, j;
64
65         CDEBUG(D_INFO, "dump from %s\n", label);
66         if (mds->mds_lov_page_dirty == NULL) {
67                 CERROR("NULL bitmap!\n");
68                 GOTO(skip_bitmap, i);
69         }
70
71         for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i,j,data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107         if (mds->mds_lov_page_dirty == NULL)
108                 RETURN(-ENOMEM);
109
110
111         OBD_ALLOC(mds->mds_lov_page_array, size);
112         if (mds->mds_lov_page_array == NULL)
113                 GOTO(err_free_bitmap, rc = -ENOMEM);
114
115         /* open and test the lov objd file */
116         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
117         if (IS_ERR(file)) {
118                 rc = PTR_ERR(file);
119                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120                 GOTO(err_free, rc = PTR_ERR(file));
121         }
122         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124                        file->f_dentry->d_inode->i_mode);
125                 GOTO(err_open, rc = -ENOENT);
126         }
127         mds->mds_lov_objid_filp = file;
128
129         RETURN (0);
130 err_open:
131         if (filp_close((struct file *)file, 0))
132                 CERROR("can't close %s after error\n", LOV_OBJID);
133 err_free:
134         OBD_FREE(mds->mds_lov_page_array, size);
135 err_free_bitmap:
136         FREE_BITMAP(mds->mds_lov_page_dirty);
137
138         RETURN(rc);
139 }
140 EXPORT_SYMBOL(mds_lov_init_objids);
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
169
170 /**
171  * currently exist two ways for know about ost count and max ost index.
172  * first - after ost is connected to mds and sync process finished
173  * second - get from lmm in recovery process, in case when mds not have configs,
174  * and ost isn't registered in mgs.
175  *
176  * \param mds pointer to mds structure
177  * \param index maxium ost index
178  *
179  * \retval -ENOMEM is not hame memory for new page
180  * \retval 0 is update passed
181  */
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 {
184         __u32 page = index / OBJID_PER_PAGE();
185         __u32 off = index % OBJID_PER_PAGE();
186         obd_id *data =  mds->mds_lov_page_array[page];
187
188         if (data == NULL) {
189                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
190                 if (data == NULL)
191                         RETURN(-ENOMEM);
192
193                 mds->mds_lov_page_array[page] = data;
194         }
195
196         if (index > mds->mds_lov_objid_max_index) {
197                 mds->mds_lov_objid_lastpage = page;
198                 mds->mds_lov_objid_lastidx = off;
199                 mds->mds_lov_objid_max_index = index;
200         }
201
202         /* workaround - New target not in objids file; increase mdsize */
203         /* ld_tgt_count is used as the max index everywhere, despite its name. */
204         if (data[off] == 0) {
205                 __u32 stripes;
206
207                 data[off] = 1;
208                 mds->mds_lov_objid_count++;
209                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210                                 mds->mds_lov_objid_count);
211
212                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215                        "stripes: %d/%d\n", mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize, stripes);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
224 {
225         __u32 page = index / OBJID_PER_PAGE();
226         __u32 off = index % OBJID_PER_PAGE();
227         obd_id *data =  mds->mds_lov_page_array[page];
228
229         return (data[off] > 0);
230 }
231
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
233 {
234         int rc = 0;
235         __u32 j;
236         struct lov_ost_data_v1 *lmm_objects;
237
238         /* if we create file without objects - lmm is NULL */
239         if (lmm == NULL)
240                 return 0;
241
242         mutex_down(&obd->obd_dev_sem);
243         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
244                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
245         else
246                 lmm_objects = lmm->lmm_objects;
247
248         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
249                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
250                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
251                         rc = -ENOMEM;
252                         break;
253                 }
254         }
255         mutex_up(&obd->obd_dev_sem);
256
257         RETURN(rc);
258 }
259 EXPORT_SYMBOL(mds_lov_prepare_objids);
260
261 /*
262  * write llog orphan record about lost ost object,
263  * Special lsm is allocated with single stripe, caller should deallocated it
264  * after use
265  */
266 static int mds_log_lost_precreated(struct obd_device *obd,
267                                    struct lov_stripe_md **lsmp, int *stripes,
268                                    obd_id id, obd_count count, int idx)
269 {
270         struct lov_stripe_md *lsm = *lsmp;
271         int rc;
272         ENTRY;
273
274         if (*lsmp == NULL) {
275                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
276                 if (rc < 0)
277                         RETURN(rc);
278                 /* need only one stripe, save old value */
279                 *stripes = lsm->lsm_stripe_count;
280                 lsm->lsm_stripe_count = 1;
281                 *lsmp = lsm;
282         }
283
284         lsm->lsm_oinfo[0]->loi_id = id;
285         lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
286         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
287
288         rc = mds_log_op_orphan(obd, lsm, count);
289         RETURN(rc);
290 }
291
292 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
293 {
294         struct mds_obd *mds = &obd->u.mds;
295         int j;
296         struct lov_ost_data_v1 *lmm_objects;
297 #ifndef HAVE_DELAYED_RECOVERY
298         struct lov_stripe_md *lsm = NULL;
299         int stripes = 0;
300 #endif
301         ENTRY;
302
303         /* if we create file without objects - lmm is NULL */
304         if (lmm == NULL)
305                 return;
306
307         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
308                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
309         else
310                 lmm_objects = lmm->lmm_objects;
311
312         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
313                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
314                 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
315                 __u32 page = i / OBJID_PER_PAGE();
316                 __u32 idx = i % OBJID_PER_PAGE();
317                 obd_id *data;
318
319                 data = mds->mds_lov_page_array[page];
320
321                 CDEBUG(D_INODE,"update last object for ost %u"
322                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
323                 if (id > data[idx]) {
324 #ifndef HAVE_DELAYED_RECOVERY
325                         int lost = id - data[idx] - 1;
326                         /* we might have lost precreated objects due to VBR */
327                         if (lost > 0 && obd->obd_recovering) {
328                                 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
329                                 if (!obd->obd_version_recov)
330                                         CWARN("Unexpected gap in objids\n");
331                                 /* lsm is allocated if NULL */
332                                 mds_log_lost_precreated(obd, &lsm, &stripes,
333                                                         data[idx] + 1, lost, i);
334                         }
335 #endif
336                         data[idx] = id;
337                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
338                 }
339         }
340 #ifndef HAVE_DELAYED_RECOVERY
341         if (lsm) {
342                 /* restore stripes number */
343                 lsm->lsm_stripe_count = stripes;
344                 obd_free_memmd(mds->mds_lov_exp, &lsm);
345         }
346 #endif
347         EXIT;
348         return;
349 }
350 EXPORT_SYMBOL(mds_lov_update_objids);
351
352 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
353                                     __u32 count)
354 {
355         __u32 i;
356         __u32 stripes;
357
358         for(i = 0; i < count; i++) {
359                 if (data[i] == 0)
360                         continue;
361
362                 mds->mds_lov_objid_count++;
363                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
364                                 mds->mds_lov_objid_count);
365
366                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
367                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
368                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
369                        "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
370         }
371         EXIT;
372         return 0;
373 }
374
375 static int mds_lov_read_objids(struct obd_device *obd)
376 {
377         struct mds_obd *mds = &obd->u.mds;
378         loff_t off = 0;
379         int i, rc = 0, count = 0, page = 0;
380         size_t size;
381         ENTRY;
382
383         /* Read everything in the file, even if our current lov desc
384            has fewer targets. Old targets not in the lov descriptor
385            during mds setup may still have valid objids. */
386         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
387         if (size == 0)
388                 RETURN(0);
389
390         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
391         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
392         for (i = 0; i < page; i++) {
393                 obd_id *data;
394                 loff_t off_old = off;
395
396                 LASSERT(mds->mds_lov_page_array[i] == NULL);
397                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
398                 if (mds->mds_lov_page_array[i] == NULL)
399                         GOTO(out, rc = -ENOMEM);
400
401                 data = mds->mds_lov_page_array[i];
402
403                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
404                                         MDS_LOV_ALLOC_SIZE, &off);
405                 if (rc < 0) {
406                         CERROR("Error reading objids %d\n", rc);
407                         GOTO(out, rc);
408                 }
409
410                 if (off == off_old) /* hole is read */
411                         off += MDS_LOV_ALLOC_SIZE;
412                 count = (off - off_old) / sizeof(obd_id);
413                 if (mds_lov_update_from_read(mds, data, count)) {
414                         CERROR("Can't update mds data\n");
415                         GOTO(out, rc = -EIO);
416                 }
417         }
418         mds->mds_lov_objid_lastpage = page - 1;
419         mds->mds_lov_objid_lastidx = count - 1;
420
421         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
422                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
423 out:
424         mds_lov_dump_objids("read",obd);
425
426         RETURN(rc);
427 }
428
429 int mds_lov_write_objids(struct obd_device *obd)
430 {
431         struct mds_obd *mds = &obd->u.mds;
432         int i = 0, rc = 0;
433         ENTRY;
434
435         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
436                 RETURN(0);
437
438         mds_lov_dump_objids("write", obd);
439
440         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
441                 obd_id *data =  mds->mds_lov_page_array[i];
442                 unsigned int size = MDS_LOV_ALLOC_SIZE;
443                 loff_t off = i * size;
444
445                 LASSERT(data != NULL);
446
447                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
448                         continue;
449
450                 /* check for particaly filled last page */
451                 if (i == mds->mds_lov_objid_lastpage)
452                         size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
453
454                 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
455                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
456                                          size, &off, 0);
457                 if (rc < 0) {
458                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
459                         break;
460                 }
461         }
462         if (rc >= 0)
463                 rc = 0;
464
465         RETURN(rc);
466 }
467 EXPORT_SYMBOL(mds_lov_write_objids);
468
469 static int mds_lov_get_objid(struct obd_device * obd,
470                              obd_id idx)
471 {
472         struct mds_obd *mds = &obd->u.mds;
473         struct obd_export *lov_exp = mds->mds_lov_exp;
474         unsigned int page;
475         unsigned int off;
476         obd_id *data;
477         __u32 size;
478         int rc = 0;
479         ENTRY;
480
481         LASSERT(lov_exp != NULL);
482
483         page = idx / OBJID_PER_PAGE();
484         off = idx % OBJID_PER_PAGE();
485
486         data = mds->mds_lov_page_array[page];
487
488         if (data[off] < 2) {
489                 /* We never read this lastid; ask the osc */
490                 struct obd_id_info lastid;
491
492                 size = sizeof(lastid);
493                 lastid.idx = idx;
494                 lastid.data = &data[off];
495                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID),
496                                   KEY_LAST_ID, &size, &lastid, NULL);
497                 if (rc)
498                         GOTO(out, rc);
499
500                 /* workaround for clean filter */
501                 if (data[off] == 0)
502                         data[off] = 1;
503
504                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
505         }
506         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
507                idx, data, page, off, data[off]);
508 out:
509         RETURN(rc);
510 }
511
512 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
513 {
514         int rc;
515         struct obdo oa = { 0 };
516         struct obd_trans_info oti = {0};
517         struct lov_stripe_md  *empty_ea = NULL;
518         ENTRY;
519
520         LASSERT(mds->mds_lov_page_array != NULL);
521
522         /* This create will in fact either create or destroy:  If the OST is
523          * missing objects below this ID, they will be created.  If it finds
524          * objects above this ID, they will be removed. */
525         memset(&oa, 0, sizeof(oa));
526         oa.o_flags = OBD_FL_DELORPHAN;
527         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
528         if (ost_uuid != NULL)
529                 oti.oti_ost_uuid = ost_uuid;
530
531         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
532
533         RETURN(rc);
534 }
535
536 /* for one target */
537 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
538 {
539         struct mds_obd *mds = &obd->u.mds;
540         int rc;
541         struct obd_id_info info;
542         ENTRY;
543
544         LASSERT(!obd->obd_recovering);
545
546         info.idx = idx;
547         info.data = id;
548         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
549                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
550         if (rc)
551                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
552                         obd->obd_name, rc);
553
554         RETURN(rc);
555 }
556
557 /* Update the lov desc for a new size lov. */
558 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
559                                struct obd_uuid *uuid)
560 {
561         struct mds_obd *mds = &obd->u.mds;
562         struct lov_desc *ld;
563         __u32 valsize = sizeof(mds->mds_lov_desc);
564         int rc = 0;
565         ENTRY;
566
567         OBD_ALLOC(ld, sizeof(*ld));
568         if (!ld)
569                 RETURN(-ENOMEM);
570
571         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
572                           &valsize, ld, NULL);
573         if (rc)
574                 GOTO(out, rc);
575
576         /* Don't change the mds_lov_desc until the objids size matches the
577            count (paranoia) */
578         mds->mds_lov_desc = *ld;
579         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
580                mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
581
582         mutex_down(&obd->obd_dev_sem);
583         rc = mds_lov_update_max_ost(mds, index);
584         mutex_up(&obd->obd_dev_sem);
585         if (rc)
586                 GOTO(out, rc = -ENOMEM);
587
588         /* If we added a target we have to reconnect the llogs */
589         /* We only _need_ to do this at first add (idx), or the first time
590            after recovery.  However, it should now be safe to call anytime. */
591         rc = obd_llog_init(obd, obd, (void *)&index); 
592
593 out:
594         OBD_FREE(ld, sizeof(*ld));
595         RETURN(rc);
596 }
597
598
599 /* Inform MDS about new/updated target */
600 static int mds_lov_update_mds(struct obd_device *obd,
601                               struct obd_device *watched,
602                               __u32 idx)
603 {
604         struct mds_obd *mds = &obd->u.mds;
605         __u32 old_count;
606         int rc = 0;
607         int page;
608         int off;
609         obd_id *data;
610
611         ENTRY;
612
613         /* Don't let anyone else mess with mds_lov_objids now */
614         old_count = mds->mds_lov_desc.ld_tgt_count;
615         LASSERT(mds_lov_objinit(mds, idx));
616
617         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
618                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
619                mds->mds_lov_desc.ld_tgt_count);
620
621         /* idx is set as data from lov_notify. */
622         if (obd->obd_recovering)
623                 GOTO(out, rc);
624
625         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
626                 CERROR("index %d > count %d!\n", idx,
627                        mds->mds_lov_desc.ld_tgt_count);
628                 GOTO(out, rc = -EINVAL);
629         }
630
631         rc = mds_lov_get_objid(obd, idx);
632         if (rc) {
633                 CERROR("Failed to get objid - %d\n", rc);
634                 GOTO(out, rc);
635         }
636
637         page = idx / OBJID_PER_PAGE();
638         off = idx % OBJID_PER_PAGE();
639         data = mds->mds_lov_page_array[page];
640         /* We have read this lastid from disk; tell the osc.
641            Don't call this during recovery. */
642         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
643         if (rc) {
644                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
645                 /* Don't abort the rest of the sync */
646                 rc = 0;
647         }
648
649         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
650                data[off], idx, rc);
651 out:
652         RETURN(rc);
653 }
654
655 /* update the LOV-OSC knowledge of the last used object id's */
656 int mds_lov_connect(struct obd_device *obd, char * lov_name)
657 {
658         struct mds_obd *mds = &obd->u.mds;
659         struct lustre_handle conn = {0,};
660         struct obd_connect_data *data;
661         int rc;
662         ENTRY;
663
664         if (IS_ERR(mds->mds_lov_obd))
665                 RETURN(PTR_ERR(mds->mds_lov_obd));
666
667         if (mds->mds_lov_obd)
668                 RETURN(0);
669
670         mds->mds_lov_obd = class_name2obd(lov_name);
671         if (!mds->mds_lov_obd) {
672                 CERROR("MDS cannot locate LOV %s\n", lov_name);
673                 GOTO(error_exit, rc = -ENOTCONN);
674         }
675
676         mutex_down(&obd->obd_dev_sem);
677         rc = mds_lov_read_objids(obd);
678         mutex_up(&obd->obd_dev_sem);
679         if (rc) {
680                 CERROR("cannot read lov_objids: rc = %d\n", rc);
681                 GOTO(error_exit, rc);
682         }
683
684         /* Deny new client connections until we are sure we have some OSTs */
685         obd->obd_no_conn = 1;
686
687         rc = obd_register_observer(mds->mds_lov_obd, obd);
688         if (rc) {
689                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
690                        lov_name, rc);
691                 GOTO(error_exit, rc);
692         }
693
694         OBD_ALLOC(data, sizeof(*data));
695         if (data == NULL)
696                 RETURN(-ENOMEM);
697         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
698                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
699                 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
700                 OBD_CONNECT_SKIP_ORPHAN;
701 #ifdef HAVE_LRU_RESIZE_SUPPORT
702         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
703 #endif
704         data->ocd_version = LUSTRE_VERSION_CODE;
705         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
706         rc = obd_connect(&conn, mds->mds_lov_obd, &obd->obd_uuid, data, &mds->mds_lov_exp);
707         OBD_FREE(data, sizeof(*data));
708         if (rc) {
709                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
710                 GOTO(error_exit, rc);
711         }
712         /* we not want postrecov in case clean fs, in other cases postrecov will
713          * be called from ldlm. otherwise we can call postrecov twice - in case
714          * short recovery */
715
716         RETURN(rc);
717
718 error_exit:
719         mds->mds_lov_exp = NULL;
720         mds->mds_lov_obd = ERR_PTR(rc);
721         RETURN(rc);
722 }
723
724 int mds_lov_disconnect(struct obd_device *obd)
725 {
726         struct mds_obd *mds = &obd->u.mds;
727         int rc = 0;
728         ENTRY;
729
730         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
731                 obd_register_observer(mds->mds_lov_obd, NULL);
732
733                 /* The actual disconnect of the mds_lov will be called from
734                  * class_disconnect_exports from mds_lov_clean. So we have to
735                  * ensure that class_cleanup doesn't fail due to the extra ref
736                  * we're holding now. The mechanism to do that already exists -
737                  * the obd_force flag. We'll drop the final ref to the
738                  * mds_lov_exp in mds_cleanup. */
739                 mds->mds_lov_obd->obd_force = 1;
740         }
741
742         RETURN(rc);
743 }
744
745 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
746                   void *karg, void *uarg)
747 {
748         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
749         struct obd_device *obd = exp->exp_obd;
750         struct mds_obd *mds = &obd->u.mds;
751         struct obd_ioctl_data *data = karg;
752         struct lvfs_run_ctxt saved;
753         int rc = 0;
754
755         ENTRY;
756         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
757
758         switch (cmd) {
759         case OBD_IOC_RECORD: {
760                 char *name = data->ioc_inlbuf1;
761                 struct llog_ctxt *ctxt;
762
763                 if (mds->mds_cfg_llh)
764                         RETURN(-EBUSY);
765
766                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
767                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
768                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
769                 llog_ctxt_put(ctxt);
770                 if (rc == 0)
771                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
772                                          &cfg_uuid);
773                 else
774                         mds->mds_cfg_llh = NULL;
775                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
776
777                 RETURN(rc);
778         }
779
780         case OBD_IOC_ENDRECORD: {
781                 if (!mds->mds_cfg_llh)
782                         RETURN(-EBADF);
783
784                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
785                 rc = llog_close(mds->mds_cfg_llh);
786                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
787
788                 mds->mds_cfg_llh = NULL;
789                 RETURN(rc);
790         }
791
792         case OBD_IOC_CLEAR_LOG: {
793                 char *name = data->ioc_inlbuf1;
794                 struct llog_ctxt *ctxt;
795                 if (mds->mds_cfg_llh)
796                         RETURN(-EBUSY);
797
798                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
799                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
800                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
801                 llog_ctxt_put(ctxt);
802                 if (rc == 0) {
803                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
804                                          NULL);
805
806                         rc = llog_destroy(mds->mds_cfg_llh);
807                         llog_free_handle(mds->mds_cfg_llh);
808                 }
809                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
810
811                 mds->mds_cfg_llh = NULL;
812                 RETURN(rc);
813         }
814
815         case OBD_IOC_DORECORD: {
816                 char *cfg_buf;
817                 struct llog_rec_hdr rec;
818                 if (!mds->mds_cfg_llh)
819                         RETURN(-EBADF);
820
821                 rec.lrh_len = llog_data_len(data->ioc_plen1);
822
823                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
824                         rec.lrh_type = OBD_CFG_REC;
825                 } else {
826                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
827                         RETURN(-EINVAL);
828                 }
829
830                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
831                 if (cfg_buf == NULL)
832                         RETURN(-EINVAL);
833                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
834                 if (rc) {
835                         OBD_FREE(cfg_buf, data->ioc_plen1);
836                         RETURN(rc);
837                 }
838
839                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
840                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
841                                     cfg_buf, -1);
842                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
843
844                 OBD_FREE(cfg_buf, data->ioc_plen1);
845                 RETURN(rc);
846         }
847
848         case OBD_IOC_PARSE: {
849                 struct llog_ctxt *ctxt =
850                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
851                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
852                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
853                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
854                 llog_ctxt_put(ctxt);
855                 if (rc)
856                         RETURN(rc);
857
858                 RETURN(rc);
859         }
860
861         case OBD_IOC_DUMP_LOG: {
862                 struct llog_ctxt *ctxt =
863                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
864                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
865                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
866                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
867                 llog_ctxt_put(ctxt);
868                 if (rc)
869                         RETURN(rc);
870
871                 RETURN(rc);
872         }
873
874         case OBD_IOC_SYNC: {
875                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
876                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
877                 RETURN(rc);
878         }
879
880         case OBD_IOC_SET_READONLY: {
881                 void *handle;
882                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
883                 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
884                        obd->obd_name, obd->u.obt.obt_sb->s_id);
885
886                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
887                 if (!IS_ERR(handle))
888                         rc = fsfilt_commit(obd, inode, handle, 1);
889
890                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
891                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
892
893                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
894                 RETURN(0);
895         }
896
897         case OBD_IOC_CATLOGLIST: {
898                 int count = mds->mds_lov_desc.ld_tgt_count;
899                 rc = llog_catalog_list(obd, count, data);
900                 RETURN(rc);
901
902         }
903         case OBD_IOC_LLOG_CHECK:
904         case OBD_IOC_LLOG_CANCEL:
905         case OBD_IOC_LLOG_REMOVE: {
906                 struct llog_ctxt *ctxt =
907                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
908                 int rc2;
909
910                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
911                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
912                 rc = llog_ioctl(ctxt, cmd, data);
913                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
914
915                 rc = obd_llog_init(obd, obd, NULL);
916                 llog_ctxt_put(ctxt);
917                 rc2 = obd_set_info_async(mds->mds_lov_exp,
918                                          sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
919                                          0, NULL, NULL);
920                 if (!rc)
921                         rc = rc2;
922                 RETURN(rc);
923         }
924         case OBD_IOC_LLOG_INFO:
925         case OBD_IOC_LLOG_PRINT: {
926                 struct llog_ctxt *ctxt =
927                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
928
929                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
930                 rc = llog_ioctl(ctxt, cmd, data);
931                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
932                 llog_ctxt_put(ctxt);
933
934                 RETURN(rc);
935         }
936
937         case OBD_IOC_ABORT_RECOVERY:
938                 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
939                 target_abort_recovery(obd);
940                 /* obd_recovering has been changed */
941                 mds_allow_cli(obd, 0);
942                 RETURN(0);
943
944         case OBD_IOC_GET_OBJ_VERSION: {
945                 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
946                 struct dentry *dentry;
947                 struct lustre_handle lockh;
948                 int lockm = LCK_CR;
949                 __u64 version;
950
951                 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
952                                                NULL, 0, MDS_INODELOCK_UPDATE);
953                 if (IS_ERR(dentry)) {
954                         rc = PTR_ERR(dentry);
955                         RETURN(rc);
956                 }
957                 version = fsfilt_get_version(obd, dentry->d_inode);
958                 ldlm_lock_decref(&lockh, lockm);
959                 l_dput(dentry);
960                 if (version < 0)
961                         rc = (int) version;
962                 else
963                         *(__u64 *) data->ioc_inlbuf2 = version;
964                 RETURN(rc);
965         }
966
967         default:
968                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
969                 RETURN(-EINVAL);
970         }
971         RETURN(0);
972
973 }
974
975 /* Collect the preconditions we need to allow client connects */
976 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
977 {
978         if (flag & CONFIG_LOG)
979                 obd->u.mds.mds_fl_cfglog = 1;
980         if (flag & CONFIG_SYNC)
981                 obd->u.mds.mds_fl_synced = 1;
982         if (flag & CONFIG_TARGET)
983                 obd->u.mds.mds_fl_target = 1;
984         if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
985             (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
986                 /* Open for clients */
987                 obd->obd_no_conn = 0;
988 }
989
990 struct mds_lov_sync_info {
991         struct obd_device *mlsi_obd;     /* the lov device to sync */
992         struct obd_device *mlsi_watched; /* target osc */
993         __u32              mlsi_index;   /* index of target */
994 };
995
996 /* We only sync one osc at a time, so that we don't have to hold
997    any kind of lock on the whole mds_lov_desc, which may change
998    (grow) as a result of mds_lov_add_ost.  This also avoids any
999    kind of mismatch between the lov_desc and the mds_lov_desc,
1000    which are not in lock-step during lov_add_obd */
1001 static int __mds_lov_synchronize(void *data)
1002 {
1003         struct mds_lov_sync_info *mlsi = data;
1004         struct obd_device *obd = mlsi->mlsi_obd;
1005         struct obd_device *watched = mlsi->mlsi_watched;
1006         struct mds_obd *mds = &obd->u.mds;
1007         struct obd_uuid *uuid;
1008         __u32  idx = mlsi->mlsi_index;
1009         struct llog_ctxt *ctxt;
1010         int rc = 0;
1011         ENTRY;
1012
1013         OBD_FREE(mlsi, sizeof(*mlsi));
1014
1015         LASSERT(obd);
1016         LASSERT(watched);
1017         uuid = &watched->u.cli.cl_target_uuid;
1018         LASSERT(uuid);
1019
1020         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1021
1022         rc = mds_lov_update_mds(obd, watched, idx);
1023         if (rc != 0) {
1024                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1025                 GOTO(out, rc);
1026         }
1027
1028         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
1029                                 KEY_MDS_CONN, 0, uuid, NULL);
1030         if (rc != 0)
1031                 GOTO(out, rc);
1032
1033         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1034         if (!ctxt)
1035               RETURN(-ENODEV);
1036
1037         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1038
1039         rc = llog_connect(ctxt, NULL, NULL, uuid);
1040         llog_ctxt_put(ctxt);
1041
1042         if (rc != 0) {
1043                 CERROR("%s failed at llog_origin_connect: %d\n",
1044                        obd_uuid2str(uuid), rc);
1045                 GOTO(out, rc);
1046         }
1047
1048         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1049               obd->obd_name, obd_uuid2str(uuid));
1050
1051         if (obd->obd_stopping)
1052                 GOTO(out, rc = -ENODEV);
1053
1054         rc = mds_lov_clear_orphans(mds, uuid);
1055         if (rc != 0) {
1056                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1057                        obd_uuid2str(uuid), rc);
1058                 GOTO(out, rc);
1059         }
1060
1061         EXIT;
1062 out:
1063         if (rc) {
1064                 /* Deactivate it for safety */
1065                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1066                        rc);
1067                 if (!obd->obd_stopping && mds->mds_lov_obd &&
1068                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
1069                         obd_notify(mds->mds_lov_obd, watched,
1070                                    OBD_NOTIFY_INACTIVE, NULL);
1071         } else {
1072                 /* We've successfully synced at least 1 OST and are ready
1073                    to handle client requests */
1074                 mds_allow_cli(obd, CONFIG_SYNC);
1075         }
1076
1077         class_decref(obd);
1078         return rc;
1079 }
1080
1081 int mds_lov_synchronize(void *data)
1082 {
1083         struct mds_lov_sync_info *mlsi = data;
1084         char name[20];
1085
1086         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1087         cfs_daemonize_ctxt(name);
1088
1089         RETURN(__mds_lov_synchronize(data));
1090 }
1091
1092 int mds_lov_start_synchronize(struct obd_device *obd,
1093                               struct obd_device *watched,
1094                               void *data, int nonblock)
1095 {
1096         struct mds_lov_sync_info *mlsi;
1097         int rc;
1098         struct obd_uuid *uuid;
1099         ENTRY;
1100
1101         LASSERT(watched);
1102         uuid = &watched->u.cli.cl_target_uuid;
1103
1104         OBD_ALLOC(mlsi, sizeof(*mlsi));
1105         if (mlsi == NULL)
1106                 RETURN(-ENOMEM);
1107
1108         LASSERT(data);
1109         mlsi->mlsi_obd = obd;
1110         mlsi->mlsi_watched = watched;
1111         mlsi->mlsi_index = *(__u32 *)data;
1112
1113         /* Although class_export_get(obd->obd_self_export) would lock
1114            the MDS in place, since it's only a self-export
1115            it doesn't lock the LOV in place.  The LOV can be disconnected
1116            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1117            Simply taking an export ref on the LOV doesn't help, because it's
1118            still disconnected. Taking an obd reference insures that we don't
1119            disconnect the LOV.  This of course means a cleanup won't
1120            finish for as long as the sync is blocking. */
1121         class_incref(obd);
1122
1123         if (nonblock) {
1124                 /* Synchronize in the background */
1125                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1126                                        CLONE_VM | CLONE_FILES);
1127                 if (rc < 0) {
1128                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1129                                obd->obd_name, rc);
1130                         class_decref(obd);
1131                 } else {
1132                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1133                                "thread=%d\n", obd->obd_name,
1134                                mlsi->mlsi_index, rc);
1135                         rc = 0;
1136                 }
1137         } else {
1138                 rc = __mds_lov_synchronize((void *)mlsi);
1139         }
1140
1141         RETURN(rc);
1142 }
1143
1144 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1145                enum obd_notify_event ev, void *data)
1146 {
1147         int rc = 0;
1148         ENTRY;
1149
1150         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1151
1152         switch (ev) {
1153         case OBD_NOTIFY_CREATE:
1154                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1155                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1156                 /* We still have to fix the lov descriptor for ost's */
1157                 LASSERT(data);
1158                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1159                                           &watched->u.cli.cl_target_uuid);
1160                 RETURN(rc);
1161         /* We only handle these: */
1162         case OBD_NOTIFY_ACTIVE:
1163                 /* lov want one or more _active_ targets for work */
1164                 mds_allow_cli(obd, CONFIG_TARGET);
1165                 /* activate event should be pass lov idx as argument */
1166         case OBD_NOTIFY_SYNC:
1167         case OBD_NOTIFY_SYNC_NONBLOCK:
1168                 /* sync event should be pass lov idx as argument */
1169                 break;
1170         case OBD_NOTIFY_CONFIG:
1171                 mds_allow_cli(obd, (unsigned long)data);
1172                 /* call this only when config is processed and stale_export_age
1173                  * value is configured */
1174                 class_disconnect_expired_exports(obd);
1175                 /* quota_type has been processed, we can now handle
1176                  * incoming quota requests */
1177                 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1178         default:
1179                 RETURN(0);
1180         }
1181
1182         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1183                 CERROR("unexpected notification of %s %s!\n",
1184                        watched->obd_type->typ_name, watched->obd_name);
1185                 RETURN(-EINVAL);
1186         }
1187
1188         if (obd->obd_recovering) {
1189                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1190                       obd->obd_name,
1191                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1192                 mds_allow_cli(obd, CONFIG_SYNC);
1193                 RETURN(0);
1194         }
1195
1196         rc = mds_lov_start_synchronize(obd, watched, data,
1197                                        !(ev == OBD_NOTIFY_SYNC));
1198
1199         if (likely(obd->obd_stopping == 0))
1200                 lquota_recovery(mds_quota_interface_ref, obd);
1201
1202         RETURN(rc);
1203 }
1204
1205 int mds_get_default_md(struct obd_device *obd, struct lov_user_md *lum)
1206 {
1207         struct lov_desc *ldesc;
1208         int rc, size = sizeof(*ldesc);
1209         ENTRY;
1210
1211         if (!lum)
1212                 RETURN(0);
1213
1214         ldesc = &obd->u.mds.mds_lov_desc;
1215         LASSERT(ldesc != NULL);
1216
1217         rc = obd_get_info(obd->u.mds.mds_lov_exp, sizeof(KEY_LOVDESC),
1218                           KEY_LOVDESC, &size, ldesc, NULL);
1219         if (rc)
1220                 RETURN(rc);
1221
1222         lum->lmm_magic = LOV_MAGIC_V1;
1223         lum->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1224         lum->lmm_pattern = ldesc->ld_pattern;
1225         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
1226         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
1227         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
1228
1229         RETURN(sizeof(*lum));
1230 }
1231
1232 /* Convert the on-disk LOV EA structre.
1233  * We always try to convert from an old LOV EA format to the common in-memory
1234  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1235  * then convert back to the new on-disk format and save it back to disk
1236  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1237  * to convert it each time this inode is accessed.
1238  *
1239  * This function is a bit interesting in the error handling.  We can safely
1240  * ship the old lmm to the client in case of failure, since it uses the same
1241  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1242  * reason.  We will not delete the old lmm data until we have written the
1243  * new format lmm data in fsfilt_set_md(). */
1244 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1245                        struct lov_mds_md *lmm, int lmm_size,
1246                        __u64 connect_flags)
1247 {
1248         struct lov_stripe_md *lsm = NULL;
1249         void *handle;
1250         int rc, err;
1251         ENTRY;
1252
1253         if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1254             (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1255                 /* client does not support LOV_MAGIC_V3, so we have to convert
1256                  * it to V1
1257                  * we convert the lmm from v3 to v1
1258                  * and return the new size (which is smaller)
1259                  * the caller supports this way to return the new size */
1260                 int new_lmm_size;
1261
1262                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1263                 /* lmm_stripe_count for non reg files is not used or -1 */
1264                 if (!S_ISREG(inode->i_mode)) {
1265                         new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1266                 } else {
1267                         __u32 lmm_stripe_count;
1268
1269                         lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1270                         new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1271                                                        LOV_MAGIC_V1);
1272                         /* move the objects to the new place */
1273                         memmove(lmm->lmm_objects,
1274                                 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1275                                 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1276                 }
1277                 /* even if new size is smaller than old one,
1278                  * this should not generate memory leak */
1279                 RETURN(new_lmm_size);
1280         }
1281
1282         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1283             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1284             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1285                 RETURN(0);
1286
1287         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1288                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1289                LOV_MAGIC);
1290
1291         rc = obd_unpackmd(obd->u.mds.mds_lov_exp, &lsm, lmm, lmm_size);
1292         if (rc < 0)
1293                 GOTO(conv_end, rc);
1294
1295         rc = obd_packmd(obd->u.mds.mds_lov_exp, &lmm, lsm);
1296         if (rc < 0)
1297                 GOTO(conv_free, rc);
1298         lmm_size = rc;
1299
1300         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1301         if (IS_ERR(handle)) {
1302                 rc = PTR_ERR(handle);
1303                 GOTO(conv_free, rc);
1304         }
1305
1306         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1307
1308         err = fsfilt_commit(obd, inode, handle, 0);
1309         if (!rc)
1310                 rc = err ? err : lmm_size;
1311         GOTO(conv_free, rc);
1312 conv_free:
1313         obd_free_memmd(obd->u.mds.mds_lov_exp, &lsm);
1314 conv_end:
1315         return rc;
1316 }