Whamcloud - gitweb
95edc0767d5f178fabf5b69f75836732eedbff42
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         if ((libcfs_debug & D_INFO) == 0)
62                 return;
63
64         CDEBUG(D_INFO, "dump from %s\n", label);
65         if (mds->mds_lov_page_dirty == NULL) {
66                 CERROR("NULL bitmap!\n");
67                 GOTO(skip_bitmap, i);
68         }
69
70         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71                 CDEBUG(D_INFO, "%u - %lx\n", i,
72                        mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i, j, data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty =
107                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108         if (mds->mds_lov_page_dirty == NULL)
109                 RETURN(-ENOMEM);
110
111
112         OBD_ALLOC(mds->mds_lov_page_array, size);
113         if (mds->mds_lov_page_array == NULL)
114                 GOTO(err_free_bitmap, rc = -ENOMEM);
115
116         /* open and test the lov objd file */
117         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
118         if (IS_ERR(file)) {
119                 rc = PTR_ERR(file);
120                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121                 GOTO(err_free, rc = PTR_ERR(file));
122         }
123         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125                        file->f_dentry->d_inode->i_mode);
126                 GOTO(err_open, rc = -ENOENT);
127         }
128         mds->mds_lov_objid_filp = file;
129
130         RETURN (0);
131 err_open:
132         if (filp_close((struct file *)file, 0))
133                 CERROR("can't close %s after error\n", LOV_OBJID);
134 err_free:
135         OBD_FREE(mds->mds_lov_page_array, size);
136 err_free_bitmap:
137         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
138
139         RETURN(rc);
140 }
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168
169 /**
170  * currently exist two ways for know about ost count and max ost index.
171  * first - after ost is connected to mds and sync process finished
172  * second - get from lmm in recovery process, in case when mds not have configs,
173  * and ost isn't registered in mgs.
174  *
175  * \param mds pointer to mds structure
176  * \param index maxium ost index
177  *
178  * \retval -ENOMEM is not hame memory for new page
179  * \retval 0 is update passed
180  */
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
182 {
183         __u32 page = index / OBJID_PER_PAGE();
184         __u32 off = index % OBJID_PER_PAGE();
185         obd_id *data =  mds->mds_lov_page_array[page];
186
187         if (data == NULL) {
188                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189                 if (data == NULL)
190                         RETURN(-ENOMEM);
191
192                 mds->mds_lov_page_array[page] = data;
193         }
194
195         if (index > mds->mds_lov_objid_max_index) {
196                 mds->mds_lov_objid_lastpage = page;
197                 mds->mds_lov_objid_lastidx = off;
198                 mds->mds_lov_objid_max_index = index;
199         }
200
201         /* workaround - New target not in objids file; increase mdsize */
202         /* ld_tgt_count is used as the max index everywhere, despite its name. */
203         if (data[off] == 0) {
204                 __u32 stripes;
205
206                 data[off] = 1;
207                 mds->mds_lov_objid_count++;
208                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
209                                 mds->mds_lov_objid_count);
210
211                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
212                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
213
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
215                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
224 {
225         __u32 page = index / OBJID_PER_PAGE();
226         __u32 off = index % OBJID_PER_PAGE();
227         obd_id *data =  mds->mds_lov_page_array[page];
228
229         return (data[off] > 0);
230 }
231
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
233 {
234         struct lov_ost_data_v1 *data;
235         __u32 count;
236         int rc = 0;
237         __u32 j;
238
239         /* if we create file without objects - lmm is NULL */
240         if (lmm == NULL)
241                 return 0;
242
243         switch (le32_to_cpu(lmm->lmm_magic)) {
244                 case LOV_MAGIC_V1:
245                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
246                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
247                         break;
248                 case LOV_MAGIC_V3:
249                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
250                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
251                         break;
252                 default:
253                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
254                         RETURN(-EINVAL);
255         }
256
257
258         cfs_mutex_down(&obd->obd_dev_sem);
259         for (j = 0; j < count; j++) {
260                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
261                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
262                         rc = -ENOMEM;
263                         break;
264                 }
265         }
266         cfs_mutex_up(&obd->obd_dev_sem);
267
268         RETURN(rc);
269 }
270 EXPORT_SYMBOL(mds_lov_prepare_objids);
271
272 /*
273  * write llog orphan record about lost ost object,
274  * Special lsm is allocated with single stripe, caller should deallocated it
275  * after use
276  */
277 static int mds_log_lost_precreated(struct obd_device *obd,
278                                    struct lov_stripe_md **lsmp, int *stripes,
279                                    obd_id id, obd_count count, int idx)
280 {
281         struct lov_stripe_md *lsm = *lsmp;
282         int rc;
283         ENTRY;
284
285         if (*lsmp == NULL) {
286                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
287                 if (rc < 0)
288                         RETURN(rc);
289                 /* need only one stripe, save old value */
290                 *stripes = lsm->lsm_stripe_count;
291                 lsm->lsm_stripe_count = 1;
292                 *lsmp = lsm;
293         }
294
295         lsm->lsm_oinfo[0]->loi_id = id;
296         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
297         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
298
299         rc = mds_log_op_orphan(obd, lsm, count);
300         RETURN(rc);
301 }
302
303 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
304 {
305         struct mds_obd *mds = &obd->u.mds;
306         int j;
307         struct lov_ost_data_v1 *obj;
308         struct lov_stripe_md *lsm = NULL;
309         int stripes = 0;
310         int count;
311         ENTRY;
312
313         /* if we create file without objects - lmm is NULL */
314         if (lmm == NULL)
315                 return;
316
317         switch (le32_to_cpu(lmm->lmm_magic)) {
318                 case LOV_MAGIC_V1:
319                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
320                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
321                         break;
322                 case LOV_MAGIC_V3:
323                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
324                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
325                         break;
326                 default:
327                         CERROR("Unknow lmm type %X !\n",
328                                le32_to_cpu(lmm->lmm_magic));
329                         return;
330         }
331
332         for (j = 0; j < count; j++) {
333                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
334                 obd_id id = le64_to_cpu(obj[j].l_object_id);
335                 __u32 page = i / OBJID_PER_PAGE();
336                 __u32 idx = i % OBJID_PER_PAGE();
337                 obd_id *data;
338
339                 data = mds->mds_lov_page_array[page];
340
341                 CDEBUG(D_INODE,"update last object for ost %u"
342                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
343                 if (id > data[idx]) {
344                         int lost = id - data[idx] - 1;
345                         /* we might have lost precreated objects due to VBR */
346                         if (lost > 0 && obd->obd_recovering) {
347                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
348                                 if (!obd->obd_version_recov)
349                                         CERROR("Unexpected gap in objids\n");
350                                 /* lsm is allocated if NULL */
351                                 mds_log_lost_precreated(obd, &lsm, &stripes,
352                                                         data[idx]+1, lost, i);
353                         }
354                         data[idx] = id;
355                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
356                 }
357         }
358         if (lsm) {
359                 /* restore stripes number */
360                 lsm->lsm_stripe_count = stripes;
361                 obd_free_memmd(mds->mds_lov_exp, &lsm);
362         }
363         EXIT;
364         return;
365 }
366 EXPORT_SYMBOL(mds_lov_update_objids);
367
368 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
369                                     __u32 count)
370 {
371         __u32 i;
372         __u32 stripes;
373
374         for (i = 0; i < count; i++) {
375                 if (data[i] == 0)
376                         continue;
377
378                 mds->mds_lov_objid_count++;
379         }
380
381         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
382                          mds->mds_lov_objid_count);
383
384         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
385         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
386
387         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388                "%d/%d\n", stripes, mds->mds_max_mdsize,
389                mds->mds_max_cookiesize);
390
391         EXIT;
392         return 0;
393 }
394
395 static int mds_lov_read_objids(struct obd_device *obd)
396 {
397         struct mds_obd *mds = &obd->u.mds;
398         loff_t off = 0;
399         int i, rc = 0, count = 0, page = 0;
400         unsigned long size;
401         ENTRY;
402
403         /* Read everything in the file, even if our current lov desc
404            has fewer targets. Old targets not in the lov descriptor
405            during mds setup may still have valid objids. */
406         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
407         if (size == 0)
408                 RETURN(0);
409
410         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
411         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
412         for (i = 0; i < page; i++) {
413                 obd_id *data;
414                 loff_t off_old = off;
415
416                 LASSERT(mds->mds_lov_page_array[i] == NULL);
417                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
418                 if (mds->mds_lov_page_array[i] == NULL)
419                         GOTO(out, rc = -ENOMEM);
420
421                 data = mds->mds_lov_page_array[i];
422
423                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
424                                         MDS_LOV_ALLOC_SIZE, &off);
425                 if (rc < 0) {
426                         CERROR("Error reading objids %d\n", rc);
427                         GOTO(out, rc);
428                 }
429                 if (off == off_old) /* hole is read */
430                         off += MDS_LOV_ALLOC_SIZE;
431
432                 count = (off - off_old) / sizeof(obd_id);
433                 if (mds_lov_update_from_read(mds, data, count)) {
434                         CERROR("Can't update mds data\n");
435                         GOTO(out, rc = -EIO);
436                 }
437         }
438         mds->mds_lov_objid_lastpage = page - 1;
439         mds->mds_lov_objid_lastidx = count - 1;
440
441         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
442                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
443 out:
444         mds_lov_dump_objids("read",obd);
445
446         RETURN(rc);
447 }
448
449 int mds_lov_write_objids(struct obd_device *obd)
450 {
451         struct mds_obd *mds = &obd->u.mds;
452         int i = 0, rc = 0;
453         ENTRY;
454
455         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
456                 RETURN(0);
457
458         mds_lov_dump_objids("write", obd);
459
460         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
461                 obd_id *data =  mds->mds_lov_page_array[i];
462                 unsigned int size = MDS_LOV_ALLOC_SIZE;
463                 loff_t off = i * size;
464
465                 LASSERT(data != NULL);
466
467                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
468                         continue;
469
470                 /* check for particaly filled last page */
471                 if (i == mds->mds_lov_objid_lastpage)
472                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
473
474                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
475                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
476                                          size, &off, 0);
477                 if (rc < 0) {
478                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
479                         break;
480                 }
481         }
482         if (rc >= 0)
483                 rc = 0;
484
485         RETURN(rc);
486 }
487 EXPORT_SYMBOL(mds_lov_write_objids);
488
489 static int mds_lov_get_objid(struct obd_device * obd,
490                              obd_id idx)
491 {
492         struct mds_obd *mds = &obd->u.mds;
493         struct obd_export *lov_exp = mds->mds_lov_exp;
494         unsigned int page;
495         unsigned int off;
496         obd_id *data;
497         __u32 size;
498         int rc = 0;
499         ENTRY;
500
501         page = idx / OBJID_PER_PAGE();
502         off = idx % OBJID_PER_PAGE();
503         data = mds->mds_lov_page_array[page];
504
505         if (data[off] < 2) {
506                 /* We never read this lastid; ask the osc */
507                 struct obd_id_info lastid;
508
509                 size = sizeof(lastid);
510                 lastid.idx = idx;
511                 lastid.data = &data[off];
512                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
513                                   &size, &lastid, NULL);
514                 if (rc)
515                         GOTO(out, rc);
516
517                 /* workaround for clean filter */
518                 if (data[off] == 0)
519                         data[off] = 1;
520
521                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
522         }
523         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
524                idx, data, page, off, data[off]);
525 out:
526         RETURN(rc);
527 }
528
529 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
530 {
531         int rc;
532         struct obdo oa = { 0 };
533         struct obd_trans_info oti = {0};
534         struct lov_stripe_md  *empty_ea = NULL;
535         ENTRY;
536
537         LASSERT(mds->mds_lov_page_array != NULL);
538
539         /* This create will in fact either create or destroy:  If the OST is
540          * missing objects below this ID, they will be created.  If it finds
541          * objects above this ID, they will be removed. */
542         memset(&oa, 0, sizeof(oa));
543         oa.o_flags = OBD_FL_DELORPHAN;
544         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
545         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
546         if (ost_uuid != NULL)
547                 oti.oti_ost_uuid = ost_uuid;
548
549         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
550
551         RETURN(rc);
552 }
553
554 /* for one target */
555 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
556 {
557         struct mds_obd *mds = &obd->u.mds;
558         int rc;
559         struct obd_id_info info;
560         ENTRY;
561
562         LASSERT(!obd->obd_recovering);
563
564         info.idx = idx;
565         info.data = id;
566         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
567                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
568         if (rc)
569                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
570                         obd->obd_name, rc);
571
572         RETURN(rc);
573 }
574
575 /* Update the lov desc for a new size lov. */
576 static int mds_lov_update_desc(struct obd_device *obd, int idx,
577                                struct obd_uuid *uuid)
578 {
579         struct mds_obd *mds = &obd->u.mds;
580         struct lov_desc *ld;
581         __u32 valsize = sizeof(mds->mds_lov_desc);
582         int rc = 0;
583         ENTRY;
584
585         OBD_ALLOC(ld, sizeof(*ld));
586         if (!ld)
587                 RETURN(-ENOMEM);
588
589         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
590                           &valsize, ld, NULL);
591         if (rc)
592                 GOTO(out, rc);
593
594         /* Don't change the mds_lov_desc until the objids size matches the
595            count (paranoia) */
596         mds->mds_lov_desc = *ld;
597         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
598                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
599
600         cfs_mutex_down(&obd->obd_dev_sem);
601         rc = mds_lov_update_max_ost(mds, idx);
602         cfs_mutex_up(&obd->obd_dev_sem);
603         if (rc != 0)
604                 GOTO(out, rc );
605
606         /* If we added a target we have to reconnect the llogs */
607         /* We only _need_ to do this at first add (idx), or the first time
608            after recovery.  However, it should now be safe to call anytime. */
609         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
610         if (rc)
611                 GOTO(out, rc);
612
613 out:
614         OBD_FREE(ld, sizeof(*ld));
615         RETURN(rc);
616 }
617
618 /* Inform MDS about new/updated target */
619 static int mds_lov_update_mds(struct obd_device *obd,
620                               struct obd_device *watched,
621                               __u32 idx)
622 {
623         struct mds_obd *mds = &obd->u.mds;
624         int rc = 0;
625         int page;
626         int off;
627         obd_id *data;
628         ENTRY;
629
630         LASSERT(mds_lov_objinit(mds, idx));
631
632         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
633                idx, obd->obd_recovering, obd->obd_async_recov,
634                mds->mds_lov_desc.ld_tgt_count);
635
636         /* idx is set as data from lov_notify. */
637         if (obd->obd_recovering)
638                 GOTO(out, rc);
639
640         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
641                 CERROR("index %d > count %d!\n", idx,
642                        mds->mds_lov_desc.ld_tgt_count);
643                 GOTO(out, rc = -EINVAL);
644         }
645
646         rc = mds_lov_get_objid(obd, idx);
647         if (rc)
648                 GOTO(out, rc);
649
650         page = idx / OBJID_PER_PAGE();
651         off = idx % OBJID_PER_PAGE();
652         data = mds->mds_lov_page_array[page];
653
654         /* We have read this lastid from disk; tell the osc.
655            Don't call this during recovery. */
656         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
657         if (rc) {
658                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
659                 /* Don't abort the rest of the sync */
660                 rc = 0;
661         } else {
662                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
663                         data[off], idx, rc);
664         }
665 out:
666         RETURN(rc);
667 }
668
669 /* update the LOV-OSC knowledge of the last used object id's */
670 int mds_lov_connect(struct obd_device *obd, char * lov_name)
671 {
672         struct mds_obd *mds = &obd->u.mds;
673         struct obd_connect_data *data;
674         int rc;
675         ENTRY;
676
677         if (IS_ERR(mds->mds_lov_obd))
678                 RETURN(PTR_ERR(mds->mds_lov_obd));
679
680         if (mds->mds_lov_obd)
681                 RETURN(0);
682
683         mds->mds_lov_obd = class_name2obd(lov_name);
684         if (!mds->mds_lov_obd) {
685                 CERROR("MDS cannot locate LOV %s\n", lov_name);
686                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
687                 RETURN(-ENOTCONN);
688         }
689
690         cfs_mutex_down(&obd->obd_dev_sem);
691         rc = mds_lov_read_objids(obd);
692         cfs_mutex_up(&obd->obd_dev_sem);
693         if (rc) {
694                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
695                 GOTO(err_exit, rc);
696         }
697
698         rc = obd_register_observer(mds->mds_lov_obd, obd);
699         if (rc) {
700                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
701                        lov_name, rc);
702                 GOTO(err_exit, rc);
703         }
704
705         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
706          * targets */
707         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
708
709         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
710
711         OBD_ALLOC(data, sizeof(*data));
712         if (data == NULL)
713                 GOTO(err_exit, rc = -ENOMEM);
714
715         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
716                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
717                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
718                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
719                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
720                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
721                                   OBD_CONNECT_SOM | OBD_CONNECT_FULL20 |
722                                   OBD_CONNECT_64BITHASH;
723 #ifdef HAVE_LRU_RESIZE_SUPPORT
724         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
725 #endif
726         data->ocd_version = LUSTRE_VERSION_CODE;
727         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
728         /* send max bytes per rpc */
729         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
730         /* send the list of supported checksum types */
731         data->ocd_cksum_types = OBD_CKSUM_ALL;
732         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
733         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
734         OBD_FREE(data, sizeof(*data));
735         if (rc) {
736                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
737                 mds->mds_lov_obd = ERR_PTR(rc);
738                 RETURN(rc);
739         }
740
741         /* I want to see a callback happen when the OBD moves to a
742          * "For General Use" state, and that's when we'll call
743          * set_nextid().  The class driver can help us here, because
744          * it can use the obd_recovering flag to determine when the
745          * the OBD is full available. */
746         /* MDD device will care about that
747         if (!obd->obd_recovering)
748                 rc = mds_postrecov(obd);
749          */
750         RETURN(rc);
751
752 err_exit:
753         mds->mds_lov_exp = NULL;
754         mds->mds_lov_obd = ERR_PTR(rc);
755         RETURN(rc);
756 }
757
758 int mds_lov_disconnect(struct obd_device *obd)
759 {
760         struct mds_obd *mds = &obd->u.mds;
761         int rc = 0;
762         ENTRY;
763
764         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
765                 obd_register_observer(mds->mds_lov_obd, NULL);
766
767                 /* The actual disconnect of the mds_lov will be called from
768                  * class_disconnect_exports from mds_lov_clean. So we have to
769                  * ensure that class_cleanup doesn't fail due to the extra ref
770                  * we're holding now. The mechanism to do that already exists -
771                  * the obd_force flag. We'll drop the final ref to the
772                  * mds_lov_exp in mds_cleanup. */
773                 mds->mds_lov_obd->obd_force = 1;
774         }
775
776         RETURN(rc);
777 }
778
779 struct mds_lov_sync_info {
780         struct obd_device    *mlsi_obd;     /* the lov device to sync */
781         struct obd_device    *mlsi_watched; /* target osc */
782         __u32                 mlsi_index;   /* index of target */
783 };
784
785 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
786 {
787         struct mds_capa_info    info = { .uuid = uuid };
788         struct lustre_capa_key *key;
789         int i, rc = 0;
790
791         ENTRY;
792
793         if (!mds->mds_capa_keys)
794                 RETURN(0);
795
796         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
797         for (i = 0; i < 2; i++) {
798                 key = &mds->mds_capa_keys[i];
799                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
800
801                 info.capa = key;
802                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
803                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
804                 if (rc) {
805                         DEBUG_CAPA_KEY(D_ERROR, key,
806                                        "propagate failed (rc = %d) for", rc);
807                         RETURN(rc);
808                 }
809         }
810
811         RETURN(0);
812 }
813
814 /* We only sync one osc at a time, so that we don't have to hold
815    any kind of lock on the whole mds_lov_desc, which may change
816    (grow) as a result of mds_lov_add_ost.  This also avoids any
817    kind of mismatch between the lov_desc and the mds_lov_desc,
818    which are not in lock-step during lov_add_obd */
819 static int __mds_lov_synchronize(void *data)
820 {
821         struct mds_lov_sync_info *mlsi = data;
822         struct obd_device *obd = mlsi->mlsi_obd;
823         struct obd_device *watched = mlsi->mlsi_watched;
824         struct mds_obd *mds = &obd->u.mds;
825         struct obd_uuid *uuid;
826         __u32  idx = mlsi->mlsi_index;
827         struct mds_group_info mgi;
828         struct llog_ctxt *ctxt;
829         int rc = 0;
830         ENTRY;
831
832         OBD_FREE_PTR(mlsi);
833
834         LASSERT(obd);
835         LASSERT(watched);
836         uuid = &watched->u.cli.cl_target_uuid;
837         LASSERT(uuid);
838
839         cfs_down_read(&mds->mds_notify_lock);
840         if (obd->obd_stopping || obd->obd_fail)
841                 GOTO(out, rc = -ENODEV);
842
843         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
844         rc = mds_lov_update_mds(obd, watched, idx);
845         if (rc != 0) {
846                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
847                 GOTO(out, rc);
848         }
849         mgi.group = mdt_to_obd_objseq(mds->mds_id);
850         mgi.uuid = uuid;
851
852         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
853                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
854         if (rc != 0)
855                 GOTO(out, rc);
856         /* propagate capability keys */
857         rc = mds_propagate_capa_keys(mds, uuid);
858         if (rc)
859                 GOTO(out, rc);
860
861         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
862         if (!ctxt)
863                 GOTO(out, rc = -ENODEV);
864
865         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
866         rc = llog_connect(ctxt, NULL, NULL, uuid);
867         llog_ctxt_put(ctxt);
868         if (rc != 0) {
869                 CERROR("%s failed at llog_origin_connect: %d\n",
870                        obd_uuid2str(uuid), rc);
871                 GOTO(out, rc);
872         }
873
874         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
875               obd->obd_name, obd_uuid2str(uuid));
876
877         rc = mds_lov_clear_orphans(mds, uuid);
878         if (rc != 0) {
879                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
880                        obd_uuid2str(uuid), rc);
881                 GOTO(out, rc);
882         }
883
884 #ifdef HAVE_QUOTA_SUPPORT
885         if (obd->obd_upcall.onu_owner) {
886                 /*
887                  * This is a hack for mds_notify->mdd_notify. When the mds obd
888                  * in mdd is removed, This hack should be removed.
889                  */
890                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
891                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
892                                                 obd->obd_upcall.onu_owner,NULL);
893         }
894 #endif
895         EXIT;
896 out:
897         cfs_up_read(&mds->mds_notify_lock);
898         if (rc) {
899                 /* Deactivate it for safety */
900                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
901                        rc);
902                 if (!obd->obd_stopping && mds->mds_lov_obd &&
903                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
904                         obd_notify(mds->mds_lov_obd, watched,
905                                    OBD_NOTIFY_INACTIVE, NULL);
906         }
907
908         class_decref(obd, "mds_lov_synchronize", obd);
909         return rc;
910 }
911
912 int mds_lov_synchronize(void *data)
913 {
914         struct mds_lov_sync_info *mlsi = data;
915         char name[20];
916
917         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
918         cfs_daemonize_ctxt(name);
919
920         RETURN(__mds_lov_synchronize(data));
921 }
922
923 int mds_lov_start_synchronize(struct obd_device *obd,
924                               struct obd_device *watched,
925                               void *data, enum obd_notify_event ev)
926 {
927         struct mds_lov_sync_info *mlsi;
928         int rc;
929         struct obd_uuid *uuid;
930         ENTRY;
931
932         LASSERT(watched);
933         uuid = &watched->u.cli.cl_target_uuid;
934
935         OBD_ALLOC(mlsi, sizeof(*mlsi));
936         if (mlsi == NULL)
937                 RETURN(-ENOMEM);
938
939         LASSERT(data);
940         mlsi->mlsi_obd = obd;
941         mlsi->mlsi_watched = watched;
942         mlsi->mlsi_index = *(__u32 *)data;
943
944         /* Although class_export_get(obd->obd_self_export) would lock
945            the MDS in place, since it's only a self-export
946            it doesn't lock the LOV in place.  The LOV can be disconnected
947            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
948            Simply taking an export ref on the LOV doesn't help, because it's
949            still disconnected. Taking an obd reference insures that we don't
950            disconnect the LOV.  This of course means a cleanup won't
951            finish for as long as the sync is blocking. */
952         class_incref(obd, "mds_lov_synchronize", obd);
953
954         if (ev != OBD_NOTIFY_SYNC) {
955                 /* Synchronize in the background */
956                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
957                                        CFS_DAEMON_FLAGS);
958                 if (rc < 0) {
959                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
960                                obd->obd_name, rc);
961                         class_decref(obd, "mds_lov_synchronize", obd);
962                 } else {
963                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
964                                "thread=%d\n", obd->obd_name,
965                                mlsi->mlsi_index, rc);
966                         rc = 0;
967                 }
968         } else {
969                 rc = __mds_lov_synchronize((void *)mlsi);
970         }
971
972         RETURN(rc);
973 }
974
975 int mds_notify(struct obd_device *obd, struct obd_device *watched,
976                enum obd_notify_event ev, void *data)
977 {
978         struct mds_obd *mds = &obd->u.mds;
979         int rc = 0;
980         ENTRY;
981
982         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
983
984         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
985                 CERROR("unexpected notification of %s %s!\n",
986                        watched->obd_type->typ_name, watched->obd_name);
987                 RETURN(-EINVAL);
988         }
989
990         /*XXX this notifies the MDD until lov handling use old mds code
991          * must non block!
992          */
993         if (obd->obd_upcall.onu_owner) {
994                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
995                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
996                                                  obd->obd_upcall.onu_owner,
997                                                  &mds->mds_obt.obt_mount_count);
998         }
999
1000         switch (ev) {
1001         /* We only handle these: */
1002         case OBD_NOTIFY_CREATE:
1003                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1004                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1005                 /* We still have to fix the lov descriptor for ost's */
1006                 LASSERT(data);
1007                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1008                                           &watched->u.cli.cl_target_uuid);
1009                 RETURN(rc);
1010         case OBD_NOTIFY_ACTIVE:
1011                 /* lov want one or more _active_ targets for work */
1012                 /* activate event should be pass lov idx as argument */
1013         case OBD_NOTIFY_SYNC:
1014         case OBD_NOTIFY_SYNC_NONBLOCK:
1015                 /* sync event should be pass lov idx as argument */
1016                 break;
1017         default:
1018                 RETURN(0);
1019         }
1020
1021         if (obd->obd_recovering) {
1022                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1023                       obd->obd_name,
1024                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1025                 /* We still have to fix the lov descriptor for ost's added
1026                    after the mdt in the config log.  They didn't make it into
1027                    mds_lov_connect. */
1028                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1029                                          &watched->u.cli.cl_target_uuid);
1030         } else {
1031                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1032         }
1033         RETURN(rc);
1034 }