Whamcloud - gitweb
add ability to resend request if it isn't fit in reply buffer.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
52 #include <obd_lov.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
55
56 #include "mds_internal.h"
57
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
59
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         unsigned int i=0, j;
64
65         CDEBUG(D_INFO, "dump from %s\n", label);
66         if (mds->mds_lov_page_dirty == NULL) {
67                 CERROR("NULL bitmap!\n");
68                 GOTO(skip_bitmap, i);
69         }
70
71         for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i,j,data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107         if (mds->mds_lov_page_dirty == NULL)
108                 RETURN(-ENOMEM);
109
110
111         OBD_ALLOC(mds->mds_lov_page_array, size);
112         if (mds->mds_lov_page_array == NULL)
113                 GOTO(err_free_bitmap, rc = -ENOMEM);
114
115         /* open and test the lov objd file */
116         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
117         if (IS_ERR(file)) {
118                 rc = PTR_ERR(file);
119                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120                 GOTO(err_free, rc = PTR_ERR(file));
121         }
122         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124                        file->f_dentry->d_inode->i_mode);
125                 GOTO(err_open, rc = -ENOENT);
126         }
127         mds->mds_lov_objid_filp = file;
128
129         RETURN (0);
130 err_open:
131         if (filp_close((struct file *)file, 0))
132                 CERROR("can't close %s after error\n", LOV_OBJID);
133 err_free:
134         OBD_FREE(mds->mds_lov_page_array, size);
135 err_free_bitmap:
136         FREE_BITMAP(mds->mds_lov_page_dirty);
137
138         RETURN(rc);
139 }
140 EXPORT_SYMBOL(mds_lov_init_objids);
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
169
170 /**
171  * currently exist two ways for know about ost count and max ost index.
172  * first - after ost is connected to mds and sync process finished
173  * second - get from lmm in recovery process, in case when mds not have configs,
174  * and ost isn't registered in mgs.
175  *
176  * \param mds pointer to mds structure
177  * \param index maxium ost index
178  *
179  * \retval -ENOMEM is not hame memory for new page
180  * \retval 0 is update passed
181  */
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 {
184         __u32 page = index / OBJID_PER_PAGE();
185         __u32 off = index % OBJID_PER_PAGE();
186         obd_id *data =  mds->mds_lov_page_array[page];
187
188         if (data == NULL) {
189                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
190                 if (data == NULL)
191                         RETURN(-ENOMEM);
192
193                 mds->mds_lov_page_array[page] = data;
194         }
195
196         if (index > mds->mds_lov_objid_max_index) {
197                 mds->mds_lov_objid_lastpage = page;
198                 mds->mds_lov_objid_lastidx = off;
199                 mds->mds_lov_objid_max_index = index;
200         }
201
202         /* workaround - New target not in objids file; increase mdsize */
203         /* ld_tgt_count is used as the max index everywhere, despite its name. */
204         if (data[off] == 0) {
205                 __u32 stripes;
206
207                 data[off] = 1;
208                 mds->mds_lov_objid_count++;
209                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210                                 mds->mds_lov_objid_count);
211
212                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215                        "stripes: %d/%d\n", mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize, stripes);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
224 {
225         __u32 page = index / OBJID_PER_PAGE();
226         __u32 off = index % OBJID_PER_PAGE();
227         obd_id *data =  mds->mds_lov_page_array[page];
228
229         return (data[off] > 0);
230 }
231
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
233 {
234         int rc = 0;
235         __u32 j;
236         struct lov_ost_data_v1 *lmm_objects;
237
238         /* if we create file without objects - lmm is NULL */
239         if (lmm == NULL)
240                 return 0;
241
242         mutex_down(&obd->obd_dev_sem);
243         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
244                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
245         else
246                 lmm_objects = lmm->lmm_objects;
247
248         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
249                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
250                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
251                         rc = -ENOMEM;
252                         break;
253                 }
254         }
255         mutex_up(&obd->obd_dev_sem);
256
257         RETURN(rc);
258 }
259 EXPORT_SYMBOL(mds_lov_prepare_objids);
260
261 /*
262  * write llog orphan record about lost ost object,
263  * Special lsm is allocated with single stripe, caller should deallocated it
264  * after use
265  */
266 static int mds_log_lost_precreated(struct obd_device *obd,
267                                    struct lov_stripe_md **lsmp, int *stripes,
268                                    obd_id id, obd_count count, int idx)
269 {
270         struct lov_stripe_md *lsm = *lsmp;
271         int rc;
272         ENTRY;
273
274         if (*lsmp == NULL) {
275                 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
276                 if (rc < 0)
277                         RETURN(rc);
278                 /* need only one stripe, save old value */
279                 *stripes = lsm->lsm_stripe_count;
280                 lsm->lsm_stripe_count = 1;
281                 *lsmp = lsm;
282         }
283
284         lsm->lsm_oinfo[0]->loi_id = id;
285         lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
286         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
287
288         rc = mds_log_op_orphan(obd, lsm, count);
289         RETURN(rc);
290 }
291
292 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
293 {
294         struct mds_obd *mds = &obd->u.mds;
295         int j;
296         struct lov_ost_data_v1 *lmm_objects;
297 #ifndef HAVE_DELAYED_RECOVERY
298         struct lov_stripe_md *lsm = NULL;
299         int stripes = 0;
300 #endif
301         ENTRY;
302
303         /* if we create file without objects - lmm is NULL */
304         if (lmm == NULL)
305                 return;
306
307         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
308                 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
309         else
310                 lmm_objects = lmm->lmm_objects;
311
312         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
313                 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
314                 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
315                 __u32 page = i / OBJID_PER_PAGE();
316                 __u32 idx = i % OBJID_PER_PAGE();
317                 obd_id *data;
318
319                 data = mds->mds_lov_page_array[page];
320
321                 CDEBUG(D_INODE,"update last object for ost %u"
322                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
323                 if (id > data[idx]) {
324 #ifndef HAVE_DELAYED_RECOVERY
325                         int lost = id - data[idx] - 1;
326                         /* we might have lost precreated objects due to VBR */
327                         if (lost > 0 && obd->obd_recovering) {
328                                 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
329                                 if (!obd->obd_version_recov)
330                                         CWARN("Unexpected gap in objids\n");
331                                 /* lsm is allocated if NULL */
332                                 mds_log_lost_precreated(obd, &lsm, &stripes,
333                                                         data[idx] + 1, lost, i);
334                         }
335 #endif
336                         data[idx] = id;
337                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
338                 }
339         }
340 #ifndef HAVE_DELAYED_RECOVERY
341         if (lsm) {
342                 /* restore stripes number */
343                 lsm->lsm_stripe_count = stripes;
344                 obd_free_memmd(mds->mds_osc_exp, &lsm);
345         }
346 #endif
347         EXIT;
348         return;
349 }
350 EXPORT_SYMBOL(mds_lov_update_objids);
351
352 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
353                                     __u32 count)
354 {
355         __u32 i;
356         __u32 stripes;
357
358         for(i = 0; i < count; i++) {
359                 if (data[i] == 0)
360                         continue;
361
362                 mds->mds_lov_objid_count++;
363                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
364                                 mds->mds_lov_objid_count);
365
366                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
367                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
368                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
369                        "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
370         }
371         EXIT;
372         return 0;
373 }
374
375 static int mds_lov_read_objids(struct obd_device *obd)
376 {
377         struct mds_obd *mds = &obd->u.mds;
378         loff_t off = 0;
379         int i, rc = 0, count = 0, page = 0;
380         size_t size;
381         ENTRY;
382
383         /* Read everything in the file, even if our current lov desc
384            has fewer targets. Old targets not in the lov descriptor
385            during mds setup may still have valid objids. */
386         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
387         if (size == 0)
388                 RETURN(0);
389
390         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
391         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
392         for (i = 0; i < page; i++) {
393                 obd_id *data;
394                 loff_t off_old = off;
395
396                 LASSERT(mds->mds_lov_page_array[i] == NULL);
397                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
398                 if (mds->mds_lov_page_array[i] == NULL)
399                         GOTO(out, rc = -ENOMEM);
400
401                 data = mds->mds_lov_page_array[i];
402
403                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
404                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
405                 if (rc < 0) {
406                         CERROR("Error reading objids %d\n", rc);
407                         GOTO(out, rc);
408                 }
409
410                 count = (off - off_old) / sizeof(obd_id);
411                 if (mds_lov_update_from_read(mds, data, count)) {
412                         CERROR("Can't update mds data\n");
413                         GOTO(out, rc = -EIO);
414                 }
415
416                 if (off == off_old)
417                         break; /* eof */
418          }
419          mds->mds_lov_objid_lastpage = i;
420          mds->mds_lov_objid_lastidx = count;
421
422         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
423                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
424 out:
425         mds_lov_dump_objids("read",obd);
426
427         RETURN(rc);
428 }
429
430 int mds_lov_write_objids(struct obd_device *obd)
431 {
432         struct mds_obd *mds = &obd->u.mds;
433         int i = 0, rc = 0;
434         ENTRY;
435
436         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
437                 RETURN(0);
438
439         mds_lov_dump_objids("write", obd);
440
441         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
442                 obd_id *data =  mds->mds_lov_page_array[i];
443                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
444                 loff_t off = i * size;
445
446                 LASSERT(data != NULL);
447
448                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
449                         continue;
450
451                 /* check for particaly filled last page */
452                 if (i == mds->mds_lov_objid_lastpage)
453                         size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
454
455                 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
456                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
457                                          size, &off, 0);
458                 if (rc < 0) {
459                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
460                         break;
461                 }
462         }
463         if (rc >= 0)
464                 rc = 0;
465
466         RETURN(rc);
467 }
468 EXPORT_SYMBOL(mds_lov_write_objids);
469
470 static int mds_lov_get_objid(struct obd_device * obd,
471                              obd_id idx)
472 {
473         struct mds_obd *mds = &obd->u.mds;
474         struct obd_export *osc_exp = mds->mds_osc_exp;
475         unsigned int page;
476         unsigned int off;
477         obd_id *data;
478         int rc = 0;
479         ENTRY;
480
481         LASSERT(osc_exp != NULL);
482
483         page = idx / OBJID_PER_PAGE();
484         off = idx % OBJID_PER_PAGE();
485
486         data = mds->mds_lov_page_array[page];
487         if (data[off] < 2 ||
488             !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) {
489                 /* We never read this lastid; ask the osc */
490                 struct obd_id_info lastid;
491                 __u32 size = sizeof(lastid);
492
493                 lastid.idx = idx;
494                 lastid.data = &data[off];
495                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
496                                   KEY_LAST_ID, &size, &lastid, NULL);
497                 if (rc)
498                         GOTO(out, rc);
499
500                 /* workaround for clean filter */
501                 if (data[off] == 0)
502                         data[off] = 1;
503
504                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
505         }
506         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
507                idx, data, page, off, data[off]);
508 out:
509         RETURN(rc);
510 }
511
512 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
513 {
514         int rc;
515         struct obdo oa = { 0 };
516         struct obd_trans_info oti = {0};
517         struct lov_stripe_md  *empty_ea = NULL;
518         ENTRY;
519
520         LASSERT(mds->mds_lov_page_array != NULL);
521
522         /* This create will in fact either create or destroy:  If the OST is
523          * missing objects below this ID, they will be created.  If it finds
524          * objects above this ID, they will be removed. */
525         memset(&oa, 0, sizeof(oa));
526         oa.o_flags = OBD_FL_DELORPHAN;
527         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
528         if (ost_uuid != NULL)
529                 oti.oti_ost_uuid = ost_uuid;
530
531         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
532
533         RETURN(rc);
534 }
535
536 /* for one target */
537 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
538 {
539         struct mds_obd *mds = &obd->u.mds;
540         int rc;
541         struct obd_id_info info;
542         ENTRY;
543
544         LASSERT(!obd->obd_recovering);
545
546         info.idx = idx;
547         info.data = id;
548         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
549                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
550         if (rc)
551                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
552                         obd->obd_name, rc);
553
554         RETURN(rc);
555 }
556
557 /* Update the lov desc for a new size lov. */
558 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
559                                struct obd_uuid *uuid)
560 {
561         struct mds_obd *mds = &obd->u.mds;
562         struct lov_desc *ld;
563         __u32 valsize = sizeof(mds->mds_lov_desc);
564         int rc = 0;
565         ENTRY;
566
567         OBD_ALLOC(ld, sizeof(*ld));
568         if (!ld)
569                 RETURN(-ENOMEM);
570
571         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
572                           &valsize, ld, NULL);
573         if (rc)
574                 GOTO(out, rc);
575
576         /* Don't change the mds_lov_desc until the objids size matches the
577            count (paranoia) */
578         mds->mds_lov_desc = *ld;
579         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
580                mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
581
582         mutex_down(&obd->obd_dev_sem);
583         rc = mds_lov_update_max_ost(mds, index);
584         mutex_up(&obd->obd_dev_sem);
585         if (rc)
586                 GOTO(out, rc = -ENOMEM);
587
588         /* If we added a target we have to reconnect the llogs */
589         /* We only _need_ to do this at first add (idx), or the first time
590            after recovery.  However, it should now be safe to call anytime. */
591         rc = obd_llog_init(obd, obd, (void *)&index); 
592
593 out:
594         OBD_FREE(ld, sizeof(*ld));
595         RETURN(rc);
596 }
597
598
599 /* Inform MDS about new/updated target */
600 static int mds_lov_update_mds(struct obd_device *obd,
601                               struct obd_device *watched,
602                               __u32 idx)
603 {
604         struct mds_obd *mds = &obd->u.mds;
605         __u32 old_count;
606         int rc = 0;
607         int page;
608         int off;
609         obd_id *data;
610
611         ENTRY;
612
613         /* Don't let anyone else mess with mds_lov_objids now */
614         old_count = mds->mds_lov_desc.ld_tgt_count;
615         LASSERT(mds_lov_objinit(mds, idx));
616
617         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
618                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
619                mds->mds_lov_desc.ld_tgt_count);
620
621         /* idx is set as data from lov_notify. */
622         if (obd->obd_recovering)
623                 GOTO(out, rc);
624
625         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
626                 CERROR("index %d > count %d!\n", idx,
627                        mds->mds_lov_desc.ld_tgt_count);
628                 GOTO(out, rc = -EINVAL);
629         }
630
631         rc = mds_lov_get_objid(obd, idx);
632         if (rc) {
633                 CERROR("Failed to get objid - %d\n", rc);
634                 GOTO(out, rc);
635         }
636
637         page = idx / OBJID_PER_PAGE();
638         off = idx % OBJID_PER_PAGE();
639         data = mds->mds_lov_page_array[page];
640         /* We have read this lastid from disk; tell the osc.
641            Don't call this during recovery. */
642         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
643         if (rc) {
644                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
645                 /* Don't abort the rest of the sync */
646                 rc = 0;
647         }
648
649         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
650                data[off], idx, rc);
651 out:
652         RETURN(rc);
653 }
654
655 /* update the LOV-OSC knowledge of the last used object id's */
656 int mds_lov_connect(struct obd_device *obd, char * lov_name)
657 {
658         struct mds_obd *mds = &obd->u.mds;
659         struct lustre_handle conn = {0,};
660         struct obd_connect_data *data;
661         int rc;
662         ENTRY;
663
664         if (IS_ERR(mds->mds_osc_obd))
665                 RETURN(PTR_ERR(mds->mds_osc_obd));
666
667         if (mds->mds_osc_obd)
668                 RETURN(0);
669
670         mds->mds_osc_obd = class_name2obd(lov_name);
671         if (!mds->mds_osc_obd) {
672                 CERROR("MDS cannot locate LOV %s\n", lov_name);
673                 GOTO(error_exit, rc = -ENOTCONN);
674         }
675
676         mutex_down(&obd->obd_dev_sem);
677         rc = mds_lov_read_objids(obd);
678         mutex_up(&obd->obd_dev_sem);
679         if (rc) {
680                 CERROR("cannot read lov_objids: rc = %d\n", rc);
681                 GOTO(error_exit, rc);
682         }
683
684         /* Deny new client connections until we are sure we have some OSTs */
685         obd->obd_no_conn = 1;
686
687         rc = obd_register_observer(mds->mds_osc_obd, obd);
688         if (rc) {
689                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
690                        lov_name, rc);
691                 GOTO(error_exit, rc);
692         }
693
694         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
695          * targets */
696         obd_notify(obd->u.mds.mds_osc_obd, NULL, OBD_NOTIFY_CREATE, NULL);
697
698         OBD_ALLOC(data, sizeof(*data));
699         if (data == NULL)
700                 RETURN(-ENOMEM);
701         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
702                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
703                 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
704                 OBD_CONNECT_SKIP_ORPHAN;
705 #ifdef HAVE_LRU_RESIZE_SUPPORT
706         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
707 #endif
708         data->ocd_version = LUSTRE_VERSION_CODE;
709         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
710         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, &mds->mds_osc_exp);
711         OBD_FREE(data, sizeof(*data));
712         if (rc) {
713                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
714                 GOTO(error_exit, rc);
715         }
716         /* we not want postrecov in case clean fs, in other cases postrecov will
717          * be called from ldlm. otherwise we can call postrecov twice - in case
718          * short recovery */
719
720         RETURN(rc);
721
722 error_exit:
723         mds->mds_osc_exp = NULL;
724         mds->mds_osc_obd = ERR_PTR(rc);
725         RETURN(rc);
726 }
727
728 int mds_lov_disconnect(struct obd_device *obd)
729 {
730         struct mds_obd *mds = &obd->u.mds;
731         int rc = 0;
732         ENTRY;
733
734         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
735                 obd_register_observer(mds->mds_osc_obd, NULL);
736
737                 /* The actual disconnect of the mds_lov will be called from
738                  * class_disconnect_exports from mds_lov_clean. So we have to
739                  * ensure that class_cleanup doesn't fail due to the extra ref
740                  * we're holding now. The mechanism to do that already exists -
741                  * the obd_force flag. We'll drop the final ref to the
742                  * mds_osc_exp in mds_cleanup. */
743                 mds->mds_osc_obd->obd_force = 1;
744         }
745
746         RETURN(rc);
747 }
748
749 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
750                   void *karg, void *uarg)
751 {
752         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
753         struct obd_device *obd = exp->exp_obd;
754         struct mds_obd *mds = &obd->u.mds;
755         struct obd_ioctl_data *data = karg;
756         struct lvfs_run_ctxt saved;
757         int rc = 0;
758
759         ENTRY;
760         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
761
762         switch (cmd) {
763         case OBD_IOC_RECORD: {
764                 char *name = data->ioc_inlbuf1;
765                 struct llog_ctxt *ctxt;
766
767                 if (mds->mds_cfg_llh)
768                         RETURN(-EBUSY);
769
770                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
771                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
772                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
773                 llog_ctxt_put(ctxt);
774                 if (rc == 0)
775                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
776                                          &cfg_uuid);
777                 else
778                         mds->mds_cfg_llh = NULL;
779                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
780
781                 RETURN(rc);
782         }
783
784         case OBD_IOC_ENDRECORD: {
785                 if (!mds->mds_cfg_llh)
786                         RETURN(-EBADF);
787
788                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
789                 rc = llog_close(mds->mds_cfg_llh);
790                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
791
792                 mds->mds_cfg_llh = NULL;
793                 RETURN(rc);
794         }
795
796         case OBD_IOC_CLEAR_LOG: {
797                 char *name = data->ioc_inlbuf1;
798                 struct llog_ctxt *ctxt;
799                 if (mds->mds_cfg_llh)
800                         RETURN(-EBUSY);
801
802                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
803                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
804                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
805                 llog_ctxt_put(ctxt);
806                 if (rc == 0) {
807                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
808                                          NULL);
809
810                         rc = llog_destroy(mds->mds_cfg_llh);
811                         llog_free_handle(mds->mds_cfg_llh);
812                 }
813                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
814
815                 mds->mds_cfg_llh = NULL;
816                 RETURN(rc);
817         }
818
819         case OBD_IOC_DORECORD: {
820                 char *cfg_buf;
821                 struct llog_rec_hdr rec;
822                 if (!mds->mds_cfg_llh)
823                         RETURN(-EBADF);
824
825                 rec.lrh_len = llog_data_len(data->ioc_plen1);
826
827                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
828                         rec.lrh_type = OBD_CFG_REC;
829                 } else {
830                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
831                         RETURN(-EINVAL);
832                 }
833
834                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
835                 if (cfg_buf == NULL)
836                         RETURN(-EINVAL);
837                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
838                 if (rc) {
839                         OBD_FREE(cfg_buf, data->ioc_plen1);
840                         RETURN(rc);
841                 }
842
843                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
844                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
845                                     cfg_buf, -1);
846                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
847
848                 OBD_FREE(cfg_buf, data->ioc_plen1);
849                 RETURN(rc);
850         }
851
852         case OBD_IOC_PARSE: {
853                 struct llog_ctxt *ctxt =
854                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
855                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
856                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
857                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
858                 llog_ctxt_put(ctxt);
859                 if (rc)
860                         RETURN(rc);
861
862                 RETURN(rc);
863         }
864
865         case OBD_IOC_DUMP_LOG: {
866                 struct llog_ctxt *ctxt =
867                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
868                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
869                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
870                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
871                 llog_ctxt_put(ctxt);
872                 if (rc)
873                         RETURN(rc);
874
875                 RETURN(rc);
876         }
877
878         case OBD_IOC_SYNC: {
879                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
880                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
881                 RETURN(rc);
882         }
883
884         case OBD_IOC_SET_READONLY: {
885                 void *handle;
886                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
887                 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
888                        obd->obd_name, obd->u.obt.obt_sb->s_id);
889
890                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
891                 if (!IS_ERR(handle))
892                         rc = fsfilt_commit(obd, inode, handle, 1);
893
894                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
895                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
896
897                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
898                 RETURN(0);
899         }
900
901         case OBD_IOC_CATLOGLIST: {
902                 int count = mds->mds_lov_desc.ld_tgt_count;
903                 rc = llog_catalog_list(obd, count, data);
904                 RETURN(rc);
905
906         }
907         case OBD_IOC_LLOG_CHECK:
908         case OBD_IOC_LLOG_CANCEL:
909         case OBD_IOC_LLOG_REMOVE: {
910                 struct llog_ctxt *ctxt =
911                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
912                 int rc2;
913
914                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
915                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
916                 rc = llog_ioctl(ctxt, cmd, data);
917                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
918
919                 rc = obd_llog_init(obd, obd, NULL);
920                 llog_ctxt_put(ctxt);
921                 rc2 = obd_set_info_async(mds->mds_osc_exp,
922                                          sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
923                                          0, NULL, NULL);
924                 if (!rc)
925                         rc = rc2;
926                 RETURN(rc);
927         }
928         case OBD_IOC_LLOG_INFO:
929         case OBD_IOC_LLOG_PRINT: {
930                 struct llog_ctxt *ctxt =
931                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
932
933                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
934                 rc = llog_ioctl(ctxt, cmd, data);
935                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
936                 llog_ctxt_put(ctxt);
937
938                 RETURN(rc);
939         }
940
941         case OBD_IOC_ABORT_RECOVERY:
942                 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
943                 target_abort_recovery(obd);
944                 /* obd_recovering has been changed */
945                 mds_allow_cli(obd, 0);
946                 RETURN(0);
947
948         case OBD_IOC_GET_OBJ_VERSION: {
949                 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
950                 struct dentry *dentry;
951                 struct lustre_handle lockh;
952                 int lockm = LCK_CR;
953                 __u64 version;
954
955                 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
956                                                NULL, 0, MDS_INODELOCK_UPDATE);
957                 if (IS_ERR(dentry)) {
958                         rc = PTR_ERR(dentry);
959                         RETURN(rc);
960                 }
961                 version = fsfilt_get_version(obd, dentry->d_inode);
962                 ldlm_lock_decref(&lockh, lockm);
963                 l_dput(dentry);
964                 if (version < 0)
965                         rc = (int) version;
966                 else
967                         *(__u64 *) data->ioc_inlbuf2 = version;
968                 RETURN(rc);
969         }
970
971         default:
972                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
973                 RETURN(-EINVAL);
974         }
975         RETURN(0);
976
977 }
978
979 /* Collect the preconditions we need to allow client connects */
980 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
981 {
982         if (flag & CONFIG_LOG)
983                 obd->u.mds.mds_fl_cfglog = 1;
984         if (flag & CONFIG_SYNC)
985                 obd->u.mds.mds_fl_synced = 1;
986         if (flag & CONFIG_TARGET)
987                 obd->u.mds.mds_fl_target = 1;
988         if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
989             (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
990                 /* Open for clients */
991                 obd->obd_no_conn = 0;
992 }
993
994 struct mds_lov_sync_info {
995         struct obd_device *mlsi_obd;     /* the lov device to sync */
996         struct obd_device *mlsi_watched; /* target osc */
997         __u32              mlsi_index;   /* index of target */
998 };
999
1000 /* We only sync one osc at a time, so that we don't have to hold
1001    any kind of lock on the whole mds_lov_desc, which may change
1002    (grow) as a result of mds_lov_add_ost.  This also avoids any
1003    kind of mismatch between the lov_desc and the mds_lov_desc,
1004    which are not in lock-step during lov_add_obd */
1005 static int __mds_lov_synchronize(void *data)
1006 {
1007         struct mds_lov_sync_info *mlsi = data;
1008         struct obd_device *obd = mlsi->mlsi_obd;
1009         struct obd_device *watched = mlsi->mlsi_watched;
1010         struct mds_obd *mds = &obd->u.mds;
1011         struct obd_uuid *uuid;
1012         __u32  idx = mlsi->mlsi_index;
1013         struct llog_ctxt *ctxt;
1014         int rc = 0;
1015         ENTRY;
1016
1017         OBD_FREE(mlsi, sizeof(*mlsi));
1018
1019         LASSERT(obd);
1020         LASSERT(watched);
1021         uuid = &watched->u.cli.cl_target_uuid;
1022         LASSERT(uuid);
1023
1024         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1025
1026         rc = mds_lov_update_mds(obd, watched, idx);
1027         if (rc != 0) {
1028                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1029                 GOTO(out, rc);
1030         }
1031
1032         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
1033                                 KEY_MDS_CONN, 0, uuid, NULL);
1034         if (rc != 0)
1035                 GOTO(out, rc);
1036
1037         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1038         if (!ctxt)
1039               RETURN(-ENODEV);
1040
1041         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1042
1043         rc = llog_connect(ctxt, NULL, NULL, uuid);
1044         llog_ctxt_put(ctxt);
1045
1046         if (rc != 0) {
1047                 CERROR("%s failed at llog_origin_connect: %d\n",
1048                        obd_uuid2str(uuid), rc);
1049                 GOTO(out, rc);
1050         }
1051
1052         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1053               obd->obd_name, obd_uuid2str(uuid));
1054
1055         if (obd->obd_stopping)
1056                 GOTO(out, rc = -ENODEV);
1057
1058         rc = mds_lov_clear_orphans(mds, uuid);
1059         if (rc != 0) {
1060                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1061                        obd_uuid2str(uuid), rc);
1062                 GOTO(out, rc);
1063         }
1064
1065         EXIT;
1066 out:
1067         if (rc) {
1068                 /* Deactivate it for safety */
1069                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1070                        rc);
1071                 if (!obd->obd_stopping && mds->mds_osc_obd &&
1072                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
1073                         obd_notify(mds->mds_osc_obd, watched,
1074                                    OBD_NOTIFY_INACTIVE, NULL);
1075         } else {
1076                 /* We've successfully synced at least 1 OST and are ready
1077                    to handle client requests */
1078                 mds_allow_cli(obd, CONFIG_SYNC);
1079         }
1080
1081         class_decref(obd);
1082         return rc;
1083 }
1084
1085 int mds_lov_synchronize(void *data)
1086 {
1087         struct mds_lov_sync_info *mlsi = data;
1088         char name[20];
1089
1090         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1091         cfs_daemonize_ctxt(name);
1092
1093         RETURN(__mds_lov_synchronize(data));
1094 }
1095
1096 int mds_lov_start_synchronize(struct obd_device *obd,
1097                               struct obd_device *watched,
1098                               void *data, int nonblock)
1099 {
1100         struct mds_lov_sync_info *mlsi;
1101         int rc;
1102         struct obd_uuid *uuid;
1103         ENTRY;
1104
1105         LASSERT(watched);
1106         uuid = &watched->u.cli.cl_target_uuid;
1107
1108         OBD_ALLOC(mlsi, sizeof(*mlsi));
1109         if (mlsi == NULL)
1110                 RETURN(-ENOMEM);
1111
1112         LASSERT(data);
1113         mlsi->mlsi_obd = obd;
1114         mlsi->mlsi_watched = watched;
1115         mlsi->mlsi_index = *(__u32 *)data;
1116
1117         /* Although class_export_get(obd->obd_self_export) would lock
1118            the MDS in place, since it's only a self-export
1119            it doesn't lock the LOV in place.  The LOV can be disconnected
1120            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1121            Simply taking an export ref on the LOV doesn't help, because it's
1122            still disconnected. Taking an obd reference insures that we don't
1123            disconnect the LOV.  This of course means a cleanup won't
1124            finish for as long as the sync is blocking. */
1125         class_incref(obd);
1126
1127         if (nonblock) {
1128                 /* Synchronize in the background */
1129                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1130                                        CLONE_VM | CLONE_FILES);
1131                 if (rc < 0) {
1132                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1133                                obd->obd_name, rc);
1134                         class_decref(obd);
1135                 } else {
1136                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1137                                "thread=%d\n", obd->obd_name,
1138                                mlsi->mlsi_index, rc);
1139                         rc = 0;
1140                 }
1141         } else {
1142                 rc = __mds_lov_synchronize((void *)mlsi);
1143         }
1144
1145         RETURN(rc);
1146 }
1147
1148 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1149                enum obd_notify_event ev, void *data)
1150 {
1151         int rc = 0;
1152         ENTRY;
1153
1154         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1155
1156         switch (ev) {
1157         case OBD_NOTIFY_CREATE:
1158                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1159                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1160                 /* We still have to fix the lov descriptor for ost's */
1161                 LASSERT(data);
1162                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1163                                           &watched->u.cli.cl_target_uuid);
1164                 RETURN(rc);
1165         /* We only handle these: */
1166         case OBD_NOTIFY_ACTIVE:
1167                 /* lov want one or more _active_ targets for work */
1168                 mds_allow_cli(obd, CONFIG_TARGET);
1169                 /* activate event should be pass lov idx as argument */
1170         case OBD_NOTIFY_SYNC:
1171         case OBD_NOTIFY_SYNC_NONBLOCK:
1172                 /* sync event should be pass lov idx as argument */
1173                 break;
1174         case OBD_NOTIFY_CONFIG:
1175                 mds_allow_cli(obd, (unsigned long)data);
1176                 /* call this only when config is processed and stale_export_age
1177                  * value is configured */
1178                 class_disconnect_expired_exports(obd);
1179                 /* quota_type has been processed, we can now handle
1180                  * incoming quota requests */
1181                 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1182         default:
1183                 RETURN(0);
1184         }
1185
1186         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1187                 CERROR("unexpected notification of %s %s!\n",
1188                        watched->obd_type->typ_name, watched->obd_name);
1189                 RETURN(-EINVAL);
1190         }
1191
1192         if (obd->obd_recovering) {
1193                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1194                       obd->obd_name,
1195                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1196                 mds_allow_cli(obd, CONFIG_SYNC);
1197                 RETURN(0);
1198         }
1199
1200         rc = mds_lov_start_synchronize(obd, watched, data,
1201                                        !(ev == OBD_NOTIFY_SYNC));
1202
1203         if (likely(obd->obd_stopping == 0))
1204                 lquota_recovery(mds_quota_interface_ref, obd);
1205
1206         RETURN(rc);
1207 }
1208
1209 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1210                        int *size)
1211 {
1212         struct lov_desc *ldesc;
1213         ENTRY;
1214
1215         ldesc = &obd->u.mds.mds_lov_desc;
1216         LASSERT(ldesc != NULL);
1217
1218         if (!lmm)
1219                 RETURN(0);
1220
1221         lmm->lmm_magic = LOV_MAGIC_V1;
1222         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1223         lmm->lmm_pattern = ldesc->ld_pattern;
1224         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1225         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1226         *size = sizeof(struct lov_mds_md);
1227
1228         RETURN(sizeof(struct lov_mds_md));
1229 }
1230
1231 /* Convert the on-disk LOV EA structre.
1232  * We always try to convert from an old LOV EA format to the common in-memory
1233  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1234  * then convert back to the new on-disk format and save it back to disk
1235  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1236  * to convert it each time this inode is accessed.
1237  *
1238  * This function is a bit interesting in the error handling.  We can safely
1239  * ship the old lmm to the client in case of failure, since it uses the same
1240  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1241  * reason.  We will not delete the old lmm data until we have written the
1242  * new format lmm data in fsfilt_set_md(). */
1243 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1244                        struct lov_mds_md *lmm, int lmm_size,
1245                        __u64 connect_flags)
1246 {
1247         struct lov_stripe_md *lsm = NULL;
1248         void *handle;
1249         int rc, err;
1250         ENTRY;
1251
1252         if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1253             (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1254                 /* client does not support LOV_MAGIC_V3, so we have to convert
1255                  * it to V1
1256                  * we convert the lmm from v3 to v1
1257                  * and return the new size (which is smaller)
1258                  * the caller supports this way to return the new size */
1259                 int new_lmm_size;
1260
1261                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1262                 /* lmm_stripe_count for non reg files is not used or -1 */
1263                 if (!S_ISREG(inode->i_mode)) {
1264                         new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1265                 } else {
1266                         __u32 lmm_stripe_count;
1267
1268                         lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1269                         new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1270                                                        LOV_MAGIC_V1);
1271                         /* move the objects to the new place */
1272                         memmove(lmm->lmm_objects,
1273                                 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1274                                 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1275                 }
1276                 /* even if new size is smaller than old one,
1277                  * this should not generate memory leak */
1278                 RETURN(new_lmm_size);
1279         }
1280
1281         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1282             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1283             le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1284                 RETURN(0);
1285
1286         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1287                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1288                LOV_MAGIC);
1289
1290         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1291         if (rc < 0)
1292                 GOTO(conv_end, rc);
1293
1294         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1295         if (rc < 0)
1296                 GOTO(conv_free, rc);
1297         lmm_size = rc;
1298
1299         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1300         if (IS_ERR(handle)) {
1301                 rc = PTR_ERR(handle);
1302                 GOTO(conv_free, rc);
1303         }
1304
1305         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1306
1307         err = fsfilt_commit(obd, inode, handle, 0);
1308         if (!rc)
1309                 rc = err ? err : lmm_size;
1310         GOTO(conv_free, rc);
1311 conv_free:
1312         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1313 conv_end:
1314         return rc;
1315 }