Whamcloud - gitweb
Prepare for Build 33
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104         if (mds->mds_lov_page_dirty == NULL)
105                 RETURN(-ENOMEM);
106
107
108         OBD_ALLOC(mds->mds_lov_page_array, size);
109         if (mds->mds_lov_page_array == NULL)
110                 GOTO(err_free_bitmap, rc = -ENOMEM);
111
112         /* open and test the lov objd file */
113         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
114         if (IS_ERR(file)) {
115                 rc = PTR_ERR(file);
116                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117                 GOTO(err_free, rc = PTR_ERR(file));
118         }
119         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121                        file->f_dentry->d_inode->i_mode);
122                 GOTO(err_open, rc = -ENOENT);
123         }
124         mds->mds_lov_objid_filp = file;
125
126         RETURN (0);
127 err_open:
128         if (filp_close((struct file *)file, 0))
129                 CERROR("can't close %s after error\n", LOV_OBJID);
130 err_free:
131         OBD_FREE(mds->mds_lov_page_array, size);
132 err_free_bitmap:
133         FREE_BITMAP(mds->mds_lov_page_dirty);
134
135         RETURN(rc);
136 }
137
138 void mds_lov_destroy_objids(struct obd_device *obd)
139 {
140         struct mds_obd *mds = &obd->u.mds;
141         int i, rc;
142         ENTRY;
143
144         if (mds->mds_lov_page_array != NULL) {
145                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146                         obd_id *data = mds->mds_lov_page_array[i];
147                         if (data != NULL)
148                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
149                 }
150                 OBD_FREE(mds->mds_lov_page_array,
151                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
152         }
153
154         if (mds->mds_lov_objid_filp) {
155                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156                 mds->mds_lov_objid_filp = NULL;
157                 if (rc)
158                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
159         }
160
161         FREE_BITMAP(mds->mds_lov_page_dirty);
162         EXIT;
163 }
164
165 /**
166  * currently exist two ways for know about ost count and max ost index.
167  * first - after ost is connected to mds and sync process finished
168  * second - get from lmm in recovery process, in case when mds not have configs,
169  * and ost isn't registered in mgs.
170  *
171  * \param mds pointer to mds structure
172  * \param index maxium ost index
173  *
174  * \retval -ENOMEM is not hame memory for new page
175  * \retval 0 is update passed
176  */
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
178 {
179         __u32 page = index / OBJID_PER_PAGE();
180         __u32 off = index % OBJID_PER_PAGE();
181         obd_id *data =  mds->mds_lov_page_array[page];
182
183         if (data == NULL) {
184                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
185                 if (data == NULL)
186                         RETURN(-ENOMEM);
187
188                 mds->mds_lov_page_array[page] = data;
189         }
190
191         if (index > mds->mds_lov_objid_max_index) {
192                 mds->mds_lov_objid_lastpage = page;
193                 mds->mds_lov_objid_lastidx = off;
194                 mds->mds_lov_objid_max_index = index;
195         }
196
197         /* workaround - New target not in objids file; increase mdsize */
198         /* ld_tgt_count is used as the max index everywhere, despite its name. */
199         if (data[off] == 0) {
200                 __u32 stripes;
201
202                 data[off] = 1;
203                 mds->mds_lov_objid_count++;
204                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205                                 mds->mds_lov_objid_count);
206
207                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
209
210                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212                        mds->mds_max_cookiesize);
213         }
214
215         EXIT;
216         return 0;
217 }
218
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
220 {
221         struct lov_ost_data_v1 *data;
222         __u32 count;
223         int rc = 0;
224         __u32 j;
225
226         /* if we create file without objects - lmm is NULL */
227         if (lmm == NULL)
228                 return 0;
229
230         switch (le32_to_cpu(lmm->lmm_magic)) {
231                 case LOV_MAGIC_V1:
232                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
234                         break;
235                 case LOV_MAGIC_V3:
236                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
238                         break;
239                 default:
240                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
241                         RETURN(-EINVAL);
242         }
243
244
245         mutex_down(&obd->obd_dev_sem);
246         for (j = 0; j < count; j++) {
247                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
249                         rc = -ENOMEM;
250                         break;
251                 }
252         }
253         mutex_up(&obd->obd_dev_sem);
254
255         RETURN(rc);
256 }
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
258
259 /*
260  * write llog orphan record about lost ost object,
261  * Special lsm is allocated with single stripe, caller should deallocated it
262  * after use
263  */
264 static int mds_log_lost_precreated(struct obd_device *obd,
265                                    struct lov_stripe_md **lsmp, int *stripes,
266                                    obd_id id, obd_count count, int idx)
267 {
268         struct lov_stripe_md *lsm = *lsmp;
269         int rc;
270         ENTRY;
271
272         if (*lsmp == NULL) {
273                 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
274                 if (rc < 0)
275                         RETURN(rc);
276                 /* need only one stripe, save old value */
277                 *stripes = lsm->lsm_stripe_count;
278                 lsm->lsm_stripe_count = 1;
279                 *lsmp = lsm;
280         }
281
282         lsm->lsm_oinfo[0]->loi_id = id;
283         lsm->lsm_oinfo[0]->loi_gr = mdt_to_obd_objgrp(obd->u.mds.mds_id);
284         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
285
286         rc = mds_log_op_orphan(obd, lsm, count);
287         RETURN(rc);
288 }
289
290 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
291 {
292         struct mds_obd *mds = &obd->u.mds;
293         int j;
294         struct lov_ost_data_v1 *obj;
295         struct lov_stripe_md *lsm = NULL;
296         int stripes = 0;
297         int count;
298         ENTRY;
299
300         /* if we create file without objects - lmm is NULL */
301         if (lmm == NULL)
302                 return;
303
304         switch (le32_to_cpu(lmm->lmm_magic)) {
305                 case LOV_MAGIC_V1:
306                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
307                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
308                         break;
309                 case LOV_MAGIC_V3:
310                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
311                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
312                         break;
313                 default:
314                         CERROR("Unknow lmm type %X !\n",
315                                le32_to_cpu(lmm->lmm_magic));
316                         return;
317         }
318
319         for (j = 0; j < count; j++) {
320                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
321                 obd_id id = le64_to_cpu(obj[j].l_object_id);
322                 __u32 page = i / OBJID_PER_PAGE();
323                 __u32 idx = i % OBJID_PER_PAGE();
324                 obd_id *data;
325
326                 data = mds->mds_lov_page_array[page];
327
328                 CDEBUG(D_INODE,"update last object for ost %u"
329                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
330                 if (id > data[idx]) {
331                         int lost = id - data[idx] - 1;
332                         /* we might have lost precreated objects due to VBR */
333                         if (lost > 0 && obd->obd_recovering) {
334                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
335                                 if (!obd->obd_version_recov)
336                                         CERROR("Unexpected gap in objids\n");
337                                 /* lsm is allocated if NULL */
338                                 mds_log_lost_precreated(obd, &lsm, &stripes,
339                                                         data[idx]+1, lost, i);
340                         }
341                         data[idx] = id;
342                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
343                 }
344         }
345         if (lsm) {
346                 /* restore stripes number */
347                 lsm->lsm_stripe_count = stripes;
348                 obd_free_memmd(mds->mds_osc_exp, &lsm);
349         }
350         EXIT;
351         return;
352 }
353 EXPORT_SYMBOL(mds_lov_update_objids);
354
355 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
356                                     __u32 count)
357 {
358         __u32 i;
359         __u32 stripes;
360
361         for (i = 0; i < count; i++) {
362                 if (data[i] == 0)
363                         continue;
364
365                 mds->mds_lov_objid_count++;
366         }
367
368         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369                          mds->mds_lov_objid_count);
370
371         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
372         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373
374         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
375                "%d/%d\n", stripes, mds->mds_max_mdsize,
376                mds->mds_max_cookiesize);
377
378         EXIT;
379         return 0;
380 }
381
382 static int mds_lov_read_objids(struct obd_device *obd)
383 {
384         struct mds_obd *mds = &obd->u.mds;
385         loff_t off = 0;
386         int i, rc = 0, count = 0, page = 0;
387         unsigned long size;
388         ENTRY;
389
390         /* Read everything in the file, even if our current lov desc
391            has fewer targets. Old targets not in the lov descriptor
392            during mds setup may still have valid objids. */
393         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
394         if (size == 0)
395                 RETURN(0);
396
397         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
398         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
399         for (i = 0; i < page; i++) {
400                 obd_id *data;
401                 loff_t off_old = off;
402
403                 LASSERT(mds->mds_lov_page_array[i] == NULL);
404                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
405                 if (mds->mds_lov_page_array[i] == NULL)
406                         GOTO(out, rc = -ENOMEM);
407
408                 data = mds->mds_lov_page_array[i];
409
410                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
411                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
412                 if (rc < 0) {
413                         CERROR("Error reading objids %d\n", rc);
414                         GOTO(out, rc);
415                 }
416
417                 count += (off - off_old) / sizeof(obd_id);
418                 if (mds_lov_update_from_read(mds, data, count)) {
419                         CERROR("Can't update mds data\n");
420                         GOTO(out, rc = -EIO);
421                 }
422
423                 if (off == off_old)
424                         break; /* eof */
425         }
426         mds->mds_lov_objid_lastpage = i;
427         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
428
429         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
430                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
431 out:
432         mds_lov_dump_objids("read",obd);
433
434         RETURN(rc);
435 }
436
437 int mds_lov_write_objids(struct obd_device *obd)
438 {
439         struct mds_obd *mds = &obd->u.mds;
440         int i = 0, rc = 0;
441         ENTRY;
442
443         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
444                 RETURN(0);
445
446         mds_lov_dump_objids("write", obd);
447
448         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
449                 obd_id *data =  mds->mds_lov_page_array[i];
450                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
451                 loff_t off = i * size;
452
453                 LASSERT(data != NULL);
454
455                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
456                         continue;
457
458                 /* check for particaly filled last page */
459                 if (i == mds->mds_lov_objid_lastpage)
460                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
461
462                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
463                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
464                                          size, &off, 0);
465                 if (rc < 0) {
466                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
467                         break;
468                 }
469         }
470         if (rc >= 0)
471                 rc = 0;
472
473         RETURN(rc);
474 }
475 EXPORT_SYMBOL(mds_lov_write_objids);
476
477 static int mds_lov_get_objid(struct obd_device * obd,
478                              obd_id idx)
479 {
480         struct mds_obd *mds = &obd->u.mds;
481         unsigned int page;
482         unsigned int off;
483         obd_id *data;
484         int rc = 0;
485         ENTRY;
486
487         page = idx / OBJID_PER_PAGE();
488         off = idx % OBJID_PER_PAGE();
489         data = mds->mds_lov_page_array[page];
490         if (data[off] < 2) {
491                 /* We never read this lastid; ask the osc */
492                 struct obd_id_info lastid;
493                 __u32 size = sizeof(lastid);
494
495                 lastid.idx = idx;
496                 lastid.data = &data[off];
497                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
498                                   KEY_LAST_ID, &size, &lastid, NULL);
499                 if (rc)
500                         GOTO(out, rc);
501
502                 /* workaround for clean filter */
503                 if (data[off] == 0)
504                         data[off] = 1;
505
506                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
507         }
508         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
509                idx, data, page, off, data[off]);
510 out:
511         RETURN(rc);
512 }
513
514 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
515 {
516         int rc;
517         struct obdo oa = { 0 };
518         struct obd_trans_info oti = {0};
519         struct lov_stripe_md  *empty_ea = NULL;
520         ENTRY;
521
522         LASSERT(mds->mds_lov_page_array != NULL);
523
524         /* This create will in fact either create or destroy:  If the OST is
525          * missing objects below this ID, they will be created.  If it finds
526          * objects above this ID, they will be removed. */
527         memset(&oa, 0, sizeof(oa));
528         oa.o_flags = OBD_FL_DELORPHAN;
529         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
530         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
531         if (ost_uuid != NULL)
532                 oti.oti_ost_uuid = ost_uuid;
533
534         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
535
536         RETURN(rc);
537 }
538
539 /* for one target */
540 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
541 {
542         struct mds_obd *mds = &obd->u.mds;
543         int rc;
544         struct obd_id_info info;
545         ENTRY;
546
547         LASSERT(!obd->obd_recovering);
548
549         info.idx = idx;
550         info.data = id;
551         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
552                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
553         if (rc)
554                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
555                         obd->obd_name, rc);
556
557         RETURN(rc);
558 }
559
560 /* Update the lov desc for a new size lov. */
561 static int mds_lov_update_desc(struct obd_device *obd, int idx,
562                                struct obd_uuid *uuid, enum obd_notify_event ev)
563 {
564         struct mds_obd *mds = &obd->u.mds;
565         struct lov_desc *ld;
566         __u32 valsize = sizeof(mds->mds_lov_desc);
567         int rc = 0;
568         ENTRY;
569
570         OBD_ALLOC(ld, sizeof(*ld));
571         if (!ld)
572                 RETURN(-ENOMEM);
573
574         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
575                           &valsize, ld, NULL);
576         if (rc)
577                 GOTO(out, rc);
578
579         /* Don't change the mds_lov_desc until the objids size matches the
580            count (paranoia) */
581         mds->mds_lov_desc = *ld;
582         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
583                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
584
585         mutex_down(&obd->obd_dev_sem);
586         rc = mds_lov_update_max_ost(mds, idx);
587         mutex_up(&obd->obd_dev_sem);
588         if (rc != 0)
589                 GOTO(out, rc );
590
591         /* If we added a target we have to reconnect the llogs */
592         /* We only _need_ to do this at first add (idx), or the first time
593            after recovery.  However, it should now be safe to call anytime. */
594         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
595         if (rc)
596                 GOTO(out, rc);
597
598         /*XXX this notifies the MDD until lov handling use old mds code */
599         if (obd->obd_upcall.onu_owner) {
600                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
601                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
602                                                  obd->obd_upcall.onu_owner,
603                                                  &mds->mds_mount_count);
604         }
605 out:
606         OBD_FREE(ld, sizeof(*ld));
607         RETURN(rc);
608 }
609
610 /* Inform MDS about new/updated target */
611 static int mds_lov_update_mds(struct obd_device *obd,
612                               struct obd_device *watched,
613                               __u32 idx, enum obd_notify_event ev)
614 {
615         struct mds_obd *mds = &obd->u.mds;
616         int rc = 0;
617         int page;
618         int off;
619         obd_id *data;
620
621         ENTRY;
622
623         /* Don't let anyone else mess with mds_lov_objids now */
624         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
625         if (rc)
626                 GOTO(out, rc);
627
628         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
629                idx, obd->obd_recovering, obd->obd_async_recov,
630                mds->mds_lov_desc.ld_tgt_count);
631
632         /* idx is set as data from lov_notify. */
633         if (obd->obd_recovering)
634                 GOTO(out, rc);
635
636         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
637                 CERROR("index %d > count %d!\n", idx,
638                        mds->mds_lov_desc.ld_tgt_count);
639                 GOTO(out, rc = -EINVAL);
640         }
641
642         rc = mds_lov_get_objid(obd, idx);
643         if (rc)
644                 GOTO(out, rc);
645
646         page = idx / OBJID_PER_PAGE();
647         off = idx % OBJID_PER_PAGE();
648         data = mds->mds_lov_page_array[page];
649
650         /* We have read this lastid from disk; tell the osc.
651            Don't call this during recovery. */
652         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
653         if (rc) {
654                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
655                 /* Don't abort the rest of the sync */
656                 rc = 0;
657         } else {
658                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
659                         data[off], idx, rc);
660         }
661 out:
662         RETURN(rc);
663 }
664
665 /* update the LOV-OSC knowledge of the last used object id's */
666 int mds_lov_connect(struct obd_device *obd, char * lov_name)
667 {
668         struct mds_obd *mds = &obd->u.mds;
669         struct obd_connect_data *data;
670         int rc;
671         ENTRY;
672
673         if (IS_ERR(mds->mds_osc_obd))
674                 RETURN(PTR_ERR(mds->mds_osc_obd));
675
676         if (mds->mds_osc_obd)
677                 RETURN(0);
678
679         mds->mds_osc_obd = class_name2obd(lov_name);
680         if (!mds->mds_osc_obd) {
681                 CERROR("MDS cannot locate LOV %s\n", lov_name);
682                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
683                 RETURN(-ENOTCONN);
684         }
685
686         mutex_down(&obd->obd_dev_sem);
687         rc = mds_lov_read_objids(obd);
688         mutex_up(&obd->obd_dev_sem);
689         if (rc) {
690                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
691                 GOTO(err_exit, rc);
692         }
693
694         rc = obd_register_observer(mds->mds_osc_obd, obd);
695         if (rc) {
696                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
697                        lov_name, rc);
698                 GOTO(err_exit, rc);
699         }
700
701         /* try init too early */
702         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
703         if (rc)
704                 GOTO(err_exit, rc);
705
706         mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
707
708         OBD_ALLOC(data, sizeof(*data));
709         if (data == NULL)
710                 GOTO(err_exit, rc = -ENOMEM);
711
712         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
713                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
714                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
715                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
716                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
717                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
718                                   OBD_CONNECT_SOM;
719 #ifdef HAVE_LRU_RESIZE_SUPPORT
720         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
721 #endif
722         data->ocd_version = LUSTRE_VERSION_CODE;
723         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
724         /* send max bytes per rpc */
725         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
726         /* send the list of supported checksum types */
727         data->ocd_cksum_types = OBD_CKSUM_ALL;
728         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
729         rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
730         OBD_FREE(data, sizeof(*data));
731         if (rc) {
732                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
733                 mds->mds_osc_obd = ERR_PTR(rc);
734                 RETURN(rc);
735         }
736
737         /* I want to see a callback happen when the OBD moves to a
738          * "For General Use" state, and that's when we'll call
739          * set_nextid().  The class driver can help us here, because
740          * it can use the obd_recovering flag to determine when the
741          * the OBD is full available. */
742         /* MDD device will care about that
743         if (!obd->obd_recovering)
744                 rc = mds_postrecov(obd);
745          */
746         RETURN(rc);
747
748 err_exit:
749         mds->mds_osc_exp = NULL;
750         mds->mds_osc_obd = ERR_PTR(rc);
751         RETURN(rc);
752 }
753
754 int mds_lov_disconnect(struct obd_device *obd)
755 {
756         struct mds_obd *mds = &obd->u.mds;
757         int rc = 0;
758         ENTRY;
759
760         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
761                 obd_register_observer(mds->mds_osc_obd, NULL);
762
763                 /* The actual disconnect of the mds_lov will be called from
764                  * class_disconnect_exports from mds_lov_clean. So we have to
765                  * ensure that class_cleanup doesn't fail due to the extra ref
766                  * we're holding now. The mechanism to do that already exists -
767                  * the obd_force flag. We'll drop the final ref to the
768                  * mds_osc_exp in mds_cleanup. */
769                 mds->mds_osc_obd->obd_force = 1;
770         }
771
772         RETURN(rc);
773 }
774
775 struct mds_lov_sync_info {
776         struct obd_device    *mlsi_obd;     /* the lov device to sync */
777         struct obd_device    *mlsi_watched; /* target osc */
778         __u32                 mlsi_index;   /* index of target */
779         enum obd_notify_event mlsi_ev;      /* event type */
780 };
781
782 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
783 {
784         struct mds_capa_info    info = { .uuid = uuid };
785         struct lustre_capa_key *key;
786         int i, rc = 0;
787
788         ENTRY;
789
790         if (!mds->mds_capa_keys)
791                 RETURN(0);
792
793         for (i = 0; i < 2; i++) {
794                 key = &mds->mds_capa_keys[i];
795                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
796
797                 info.capa = key;
798                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
799                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
800                 if (rc) {
801                         DEBUG_CAPA_KEY(D_ERROR, key,
802                                        "propagate failed (rc = %d) for", rc);
803                         RETURN(rc);
804                 }
805         }
806
807         RETURN(0);
808 }
809
810 /* We only sync one osc at a time, so that we don't have to hold
811    any kind of lock on the whole mds_lov_desc, which may change
812    (grow) as a result of mds_lov_add_ost.  This also avoids any
813    kind of mismatch between the lov_desc and the mds_lov_desc,
814    which are not in lock-step during lov_add_obd */
815 static int __mds_lov_synchronize(void *data)
816 {
817         struct mds_lov_sync_info *mlsi = data;
818         struct obd_device *obd = mlsi->mlsi_obd;
819         struct obd_device *watched = mlsi->mlsi_watched;
820         struct mds_obd *mds = &obd->u.mds;
821         struct obd_uuid *uuid;
822         __u32  idx = mlsi->mlsi_index;
823         enum obd_notify_event ev = mlsi->mlsi_ev;
824         struct mds_group_info mgi;
825         struct llog_ctxt *ctxt;
826         int rc = 0;
827         ENTRY;
828
829         OBD_FREE_PTR(mlsi);
830
831         LASSERT(obd);
832         LASSERT(watched);
833         uuid = &watched->u.cli.cl_target_uuid;
834         LASSERT(uuid);
835
836         down_read(&mds->mds_notify_lock);
837         if (obd->obd_stopping || obd->obd_fail)
838                 GOTO(out, rc = -ENODEV);
839
840         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
841         rc = mds_lov_update_mds(obd, watched, idx, ev);
842         if (rc != 0) {
843                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
844                 GOTO(out, rc);
845         }
846         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
847         mgi.uuid = uuid;
848
849         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
850                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
851         if (rc != 0)
852                 GOTO(out, rc);
853         /* propagate capability keys */
854         rc = mds_propagate_capa_keys(mds, uuid);
855         if (rc)
856                 GOTO(out, rc);
857
858         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
859         if (!ctxt)
860                 GOTO(out, rc = -ENODEV);
861
862         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
863         rc = llog_connect(ctxt, NULL, NULL, uuid);
864         llog_ctxt_put(ctxt);
865         if (rc != 0) {
866                 CERROR("%s failed at llog_origin_connect: %d\n",
867                        obd_uuid2str(uuid), rc);
868                 GOTO(out, rc);
869         }
870
871         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
872               obd->obd_name, obd_uuid2str(uuid));
873         rc = mds_lov_clear_orphans(mds, uuid);
874         if (rc != 0) {
875                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
876                        obd_uuid2str(uuid), rc);
877                 GOTO(out, rc);
878         }
879
880 #ifdef HAVE_QUOTA_SUPPORT
881         if (obd->obd_upcall.onu_owner) { 
882                 /*
883                  * This is a hack for mds_notify->mdd_notify. When the mds obd
884                  * in mdd is removed, This hack should be removed.
885                  */
886                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
887                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
888                                                 obd->obd_upcall.onu_owner,NULL);
889         }
890 #endif
891         EXIT;
892 out:
893         up_read(&mds->mds_notify_lock);
894         if (rc) {
895                 /* Deactivate it for safety */
896                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
897                        rc);
898                 if (!obd->obd_stopping && mds->mds_osc_obd &&
899                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
900                         obd_notify(mds->mds_osc_obd, watched,
901                                    OBD_NOTIFY_INACTIVE, NULL);
902         }
903
904         class_decref(obd, "mds_lov_synchronize", obd);
905         return rc;
906 }
907
908 int mds_lov_synchronize(void *data)
909 {
910         struct mds_lov_sync_info *mlsi = data;
911         char name[20];
912
913         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
914         cfs_daemonize_ctxt(name);
915
916         RETURN(__mds_lov_synchronize(data));
917 }
918
919 int mds_lov_start_synchronize(struct obd_device *obd,
920                               struct obd_device *watched,
921                               void *data, enum obd_notify_event ev)
922 {
923         struct mds_lov_sync_info *mlsi;
924         int rc;
925         struct obd_uuid *uuid;
926         ENTRY;
927
928         LASSERT(watched);
929         uuid = &watched->u.cli.cl_target_uuid;
930
931         OBD_ALLOC(mlsi, sizeof(*mlsi));
932         if (mlsi == NULL)
933                 RETURN(-ENOMEM);
934
935         LASSERT(data);
936         mlsi->mlsi_obd = obd;
937         mlsi->mlsi_watched = watched;
938         mlsi->mlsi_index = *(__u32 *)data;
939         mlsi->mlsi_ev = ev;
940
941         /* Although class_export_get(obd->obd_self_export) would lock
942            the MDS in place, since it's only a self-export
943            it doesn't lock the LOV in place.  The LOV can be disconnected
944            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
945            Simply taking an export ref on the LOV doesn't help, because it's
946            still disconnected. Taking an obd reference insures that we don't
947            disconnect the LOV.  This of course means a cleanup won't
948            finish for as long as the sync is blocking. */
949         class_incref(obd, "mds_lov_synchronize", obd);
950
951         if (ev != OBD_NOTIFY_SYNC) {
952                 /* Synchronize in the background */
953                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
954                                        CLONE_VM | CLONE_FILES);
955                 if (rc < 0) {
956                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
957                                obd->obd_name, rc);
958                         class_decref(obd, "mds_lov_synchronize", obd);
959                 } else {
960                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
961                                "thread=%d\n", obd->obd_name,
962                                mlsi->mlsi_index, rc);
963                         rc = 0;
964                 }
965         } else {
966                 rc = __mds_lov_synchronize((void *)mlsi);
967         }
968
969         RETURN(rc);
970 }
971
972 int mds_notify(struct obd_device *obd, struct obd_device *watched,
973                enum obd_notify_event ev, void *data)
974 {
975         int rc = 0;
976         ENTRY;
977
978         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
979
980         switch (ev) {
981         /* We only handle these: */
982         case OBD_NOTIFY_ACTIVE:
983                 /* lov want one or more _active_ targets for work */
984                 /* activate event should be pass lov idx as argument */
985         case OBD_NOTIFY_SYNC:
986         case OBD_NOTIFY_SYNC_NONBLOCK:
987                 /* sync event should be pass lov idx as argument */
988                 break;
989         default:
990                 RETURN(0);
991         }
992
993         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
994                 CERROR("unexpected notification of %s %s!\n",
995                        watched->obd_type->typ_name, watched->obd_name);
996                 RETURN(-EINVAL);
997         }
998
999         if (obd->obd_recovering) {
1000                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1001                       obd->obd_name,
1002                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1003                 /* We still have to fix the lov descriptor for ost's added
1004                    after the mdt in the config log.  They didn't make it into
1005                    mds_lov_connect. */
1006                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1007                                          &watched->u.cli.cl_target_uuid, ev);
1008         } else {
1009                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1010         }
1011         RETURN(rc);
1012 }