Whamcloud - gitweb
40d477ddb27f93d1f295bf0ba8d72809f24cb0af
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         if ((libcfs_debug & D_INFO) == 0)
62                 return;
63
64         CDEBUG(D_INFO, "dump from %s\n", label);
65         if (mds->mds_lov_page_dirty == NULL) {
66                 CERROR("NULL bitmap!\n");
67                 GOTO(skip_bitmap, i);
68         }
69
70         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71                 CDEBUG(D_INFO, "%u - %lx\n", i,
72                        mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i, j, data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty =
107                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108         if (mds->mds_lov_page_dirty == NULL)
109                 RETURN(-ENOMEM);
110
111
112         OBD_ALLOC(mds->mds_lov_page_array, size);
113         if (mds->mds_lov_page_array == NULL)
114                 GOTO(err_free_bitmap, rc = -ENOMEM);
115
116         /* open and test the lov objd file */
117         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
118         if (IS_ERR(file)) {
119                 rc = PTR_ERR(file);
120                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121                 GOTO(err_free, rc = PTR_ERR(file));
122         }
123         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125                        file->f_dentry->d_inode->i_mode);
126                 GOTO(err_open, rc = -ENOENT);
127         }
128         mds->mds_lov_objid_filp = file;
129
130         RETURN (0);
131 err_open:
132         if (filp_close((struct file *)file, 0))
133                 CERROR("can't close %s after error\n", LOV_OBJID);
134 err_free:
135         OBD_FREE(mds->mds_lov_page_array, size);
136 err_free_bitmap:
137         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
138
139         RETURN(rc);
140 }
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168
169 /**
170  * currently exist two ways for know about ost count and max ost index.
171  * first - after ost is connected to mds and sync process finished
172  * second - get from lmm in recovery process, in case when mds not have configs,
173  * and ost isn't registered in mgs.
174  *
175  * \param mds pointer to mds structure
176  * \param index maxium ost index
177  *
178  * \retval -ENOMEM is not hame memory for new page
179  * \retval 0 is update passed
180  */
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
182 {
183         __u32 page = index / OBJID_PER_PAGE();
184         __u32 off = index % OBJID_PER_PAGE();
185         obd_id *data =  mds->mds_lov_page_array[page];
186
187         if (data == NULL) {
188                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189                 if (data == NULL)
190                         RETURN(-ENOMEM);
191
192                 mds->mds_lov_page_array[page] = data;
193         }
194
195         if (index > mds->mds_lov_objid_max_index) {
196                 mds->mds_lov_objid_lastpage = page;
197                 mds->mds_lov_objid_lastidx = off;
198                 mds->mds_lov_objid_max_index = index;
199         }
200
201         /* workaround - New target not in objids file; increase mdsize */
202         /* ld_tgt_count is used as the max index everywhere, despite its name. */
203         if (data[off] == 0) {
204                 __u32 stripes;
205
206                 data[off] = 1;
207                 mds->mds_lov_objid_count++;
208                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
209                                 mds->mds_lov_objid_count);
210
211                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
212                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
213
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
215                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
224 {
225         __u32 page = index / OBJID_PER_PAGE();
226         __u32 off = index % OBJID_PER_PAGE();
227         obd_id *data =  mds->mds_lov_page_array[page];
228
229         return (data[off] > 0);
230 }
231
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
233 {
234         struct lov_ost_data_v1 *data;
235         __u32 count;
236         int rc = 0;
237         __u32 j;
238
239         /* if we create file without objects - lmm is NULL */
240         if (lmm == NULL)
241                 return 0;
242
243         switch (le32_to_cpu(lmm->lmm_magic)) {
244                 case LOV_MAGIC_V1:
245                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
246                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
247                         break;
248                 case LOV_MAGIC_V3:
249                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
250                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
251                         break;
252                 default:
253                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
254                         RETURN(-EINVAL);
255         }
256
257
258         cfs_mutex_down(&obd->obd_dev_sem);
259         for (j = 0; j < count; j++) {
260                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
261                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
262                         rc = -ENOMEM;
263                         break;
264                 }
265         }
266         cfs_mutex_up(&obd->obd_dev_sem);
267
268         RETURN(rc);
269 }
270 EXPORT_SYMBOL(mds_lov_prepare_objids);
271
272 /*
273  * write llog orphan record about lost ost object,
274  * Special lsm is allocated with single stripe, caller should deallocated it
275  * after use
276  */
277 static int mds_log_lost_precreated(struct obd_device *obd,
278                                    struct lov_stripe_md **lsmp, int *stripes,
279                                    obd_id id, obd_count count, int idx)
280 {
281         struct lov_stripe_md *lsm = *lsmp;
282         int rc;
283         ENTRY;
284
285         if (*lsmp == NULL) {
286                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
287                 if (rc < 0)
288                         RETURN(rc);
289                 /* need only one stripe, save old value */
290                 *stripes = lsm->lsm_stripe_count;
291                 lsm->lsm_stripe_count = 1;
292                 *lsmp = lsm;
293         }
294
295         lsm->lsm_oinfo[0]->loi_id = id;
296         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
297         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
298
299         rc = mds_log_op_orphan(obd, lsm, count);
300         RETURN(rc);
301 }
302
303 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
304 {
305         struct mds_obd *mds = &obd->u.mds;
306         int j;
307         struct lov_ost_data_v1 *obj;
308         struct lov_stripe_md *lsm = NULL;
309         int stripes = 0;
310         int count;
311         ENTRY;
312
313         /* if we create file without objects - lmm is NULL */
314         if (lmm == NULL)
315                 return;
316
317         switch (le32_to_cpu(lmm->lmm_magic)) {
318                 case LOV_MAGIC_V1:
319                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
320                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
321                         break;
322                 case LOV_MAGIC_V3:
323                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
324                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
325                         break;
326                 default:
327                         CERROR("Unknow lmm type %X !\n",
328                                le32_to_cpu(lmm->lmm_magic));
329                         return;
330         }
331
332         for (j = 0; j < count; j++) {
333                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
334                 obd_id id = le64_to_cpu(obj[j].l_object_id);
335                 __u32 page = i / OBJID_PER_PAGE();
336                 __u32 idx = i % OBJID_PER_PAGE();
337                 obd_id *data;
338
339                 data = mds->mds_lov_page_array[page];
340
341                 CDEBUG(D_INODE,"update last object for ost %u"
342                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
343                 if (id > data[idx]) {
344                         int lost = id - data[idx] - 1;
345                         /* we might have lost precreated objects due to VBR */
346                         if (lost > 0 && obd->obd_recovering) {
347                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
348                                 if (!obd->obd_version_recov)
349                                         CERROR("Unexpected gap in objids\n");
350                                 /* lsm is allocated if NULL */
351                                 mds_log_lost_precreated(obd, &lsm, &stripes,
352                                                         data[idx]+1, lost, i);
353                         }
354                         data[idx] = id;
355                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
356                 }
357         }
358         if (lsm) {
359                 /* restore stripes number */
360                 lsm->lsm_stripe_count = stripes;
361                 obd_free_memmd(mds->mds_lov_exp, &lsm);
362         }
363         EXIT;
364         return;
365 }
366 EXPORT_SYMBOL(mds_lov_update_objids);
367
368 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
369                                     __u32 count)
370 {
371         __u32 i;
372         __u32 stripes;
373
374         for (i = 0; i < count; i++) {
375                 if (data[i] == 0)
376                         continue;
377
378                 mds->mds_lov_objid_count++;
379         }
380
381         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
382                          mds->mds_lov_objid_count);
383
384         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
385         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
386
387         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388                "%d/%d\n", stripes, mds->mds_max_mdsize,
389                mds->mds_max_cookiesize);
390
391         EXIT;
392         return 0;
393 }
394
395 static int mds_lov_read_objids(struct obd_device *obd)
396 {
397         struct mds_obd *mds = &obd->u.mds;
398         loff_t off = 0;
399         int i, rc = 0, count = 0, page = 0;
400         unsigned long size;
401         ENTRY;
402
403         /* Read everything in the file, even if our current lov desc
404            has fewer targets. Old targets not in the lov descriptor
405            during mds setup may still have valid objids. */
406         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
407         if (size == 0)
408                 RETURN(0);
409
410         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
411         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
412         for (i = 0; i < page; i++) {
413                 obd_id *data;
414                 loff_t off_old = off;
415
416                 LASSERT(mds->mds_lov_page_array[i] == NULL);
417                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
418                 if (mds->mds_lov_page_array[i] == NULL)
419                         GOTO(out, rc = -ENOMEM);
420
421                 data = mds->mds_lov_page_array[i];
422
423                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
424                                         MDS_LOV_ALLOC_SIZE, &off);
425                 if (rc < 0) {
426                         CERROR("Error reading objids %d\n", rc);
427                         GOTO(out, rc);
428                 }
429                 if (off == off_old) /* hole is read */
430                         off += MDS_LOV_ALLOC_SIZE;
431
432                 count = (off - off_old) / sizeof(obd_id);
433                 if (mds_lov_update_from_read(mds, data, count)) {
434                         CERROR("Can't update mds data\n");
435                         GOTO(out, rc = -EIO);
436                 }
437         }
438         mds->mds_lov_objid_lastpage = page - 1;
439         mds->mds_lov_objid_lastidx = count - 1;
440
441         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
442                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
443 out:
444         mds_lov_dump_objids("read",obd);
445
446         RETURN(rc);
447 }
448
449 int mds_lov_write_objids(struct obd_device *obd)
450 {
451         struct mds_obd *mds = &obd->u.mds;
452         int i = 0, rc = 0;
453         ENTRY;
454
455         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
456                 RETURN(0);
457
458         mds_lov_dump_objids("write", obd);
459
460         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
461                 obd_id *data =  mds->mds_lov_page_array[i];
462                 unsigned int size = MDS_LOV_ALLOC_SIZE;
463                 loff_t off = i * size;
464
465                 LASSERT(data != NULL);
466
467                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
468                         continue;
469
470                 /* check for particaly filled last page */
471                 if (i == mds->mds_lov_objid_lastpage)
472                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
473
474                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
475                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
476                                          size, &off, 0);
477                 if (rc < 0) {
478                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
479                         break;
480                 }
481         }
482         if (rc >= 0)
483                 rc = 0;
484
485         RETURN(rc);
486 }
487 EXPORT_SYMBOL(mds_lov_write_objids);
488
489 static int mds_lov_get_objid(struct obd_device * obd,
490                              obd_id idx)
491 {
492         struct mds_obd *mds = &obd->u.mds;
493         struct obd_export *lov_exp = mds->mds_lov_exp;
494         unsigned int page;
495         unsigned int off;
496         obd_id *data;
497         __u32 size;
498         int rc = 0;
499         ENTRY;
500
501         page = idx / OBJID_PER_PAGE();
502         off = idx % OBJID_PER_PAGE();
503         data = mds->mds_lov_page_array[page];
504
505         if (data[off] < 2) {
506                 /* We never read this lastid; ask the osc */
507                 struct obd_id_info lastid;
508
509                 size = sizeof(lastid);
510                 lastid.idx = idx;
511                 lastid.data = &data[off];
512                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
513                                   &size, &lastid, NULL);
514                 if (rc)
515                         GOTO(out, rc);
516
517                 /* workaround for clean filter */
518                 if (data[off] == 0)
519                         data[off] = 1;
520
521                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
522         }
523         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
524                idx, data, page, off, data[off]);
525 out:
526         RETURN(rc);
527 }
528
529 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
530 {
531         int rc;
532         struct obdo oa = { 0 };
533         struct obd_trans_info oti = {0};
534         struct lov_stripe_md  *empty_ea = NULL;
535         ENTRY;
536
537         LASSERT(mds->mds_lov_page_array != NULL);
538
539         /* This create will in fact either create or destroy:  If the OST is
540          * missing objects below this ID, they will be created.  If it finds
541          * objects above this ID, they will be removed. */
542         memset(&oa, 0, sizeof(oa));
543         oa.o_flags = OBD_FL_DELORPHAN;
544         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
545         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
546         if (ost_uuid != NULL)
547                 oti.oti_ost_uuid = ost_uuid;
548
549         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
550
551         RETURN(rc);
552 }
553
554 /* for one target */
555 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
556 {
557         struct mds_obd *mds = &obd->u.mds;
558         int rc;
559         struct obd_id_info info;
560         ENTRY;
561
562         LASSERT(!obd->obd_recovering);
563
564         info.idx = idx;
565         info.data = id;
566         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
567                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
568         if (rc)
569                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
570                         obd->obd_name, rc);
571
572         RETURN(rc);
573 }
574
575 /* Update the lov desc for a new size lov. */
576 static int mds_lov_update_desc(struct obd_device *obd, int idx,
577                                struct obd_uuid *uuid)
578 {
579         struct mds_obd *mds = &obd->u.mds;
580         struct lov_desc *ld;
581         __u32 valsize = sizeof(mds->mds_lov_desc);
582         int rc = 0;
583         ENTRY;
584
585         OBD_ALLOC(ld, sizeof(*ld));
586         if (!ld)
587                 RETURN(-ENOMEM);
588
589         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
590                           &valsize, ld, NULL);
591         if (rc)
592                 GOTO(out, rc);
593
594         /* Don't change the mds_lov_desc until the objids size matches the
595            count (paranoia) */
596         mds->mds_lov_desc = *ld;
597         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
598                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
599
600         cfs_mutex_down(&obd->obd_dev_sem);
601         rc = mds_lov_update_max_ost(mds, idx);
602         cfs_mutex_up(&obd->obd_dev_sem);
603         if (rc != 0)
604                 GOTO(out, rc );
605
606         /* If we added a target we have to reconnect the llogs */
607         /* We only _need_ to do this at first add (idx), or the first time
608            after recovery.  However, it should now be safe to call anytime. */
609         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
610         if (rc)
611                 GOTO(out, rc);
612
613 out:
614         OBD_FREE(ld, sizeof(*ld));
615         RETURN(rc);
616 }
617
618 /* Inform MDS about new/updated target */
619 static int mds_lov_update_mds(struct obd_device *obd,
620                               struct obd_device *watched,
621                               __u32 idx)
622 {
623         struct mds_obd *mds = &obd->u.mds;
624         int rc = 0;
625         int page;
626         int off;
627         obd_id *data;
628         ENTRY;
629
630         LASSERT(mds_lov_objinit(mds, idx));
631
632         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
633                idx, obd->obd_recovering, obd->obd_async_recov,
634                mds->mds_lov_desc.ld_tgt_count);
635
636         /* idx is set as data from lov_notify. */
637         if (obd->obd_recovering)
638                 GOTO(out, rc);
639
640         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
641                 CERROR("index %d > count %d!\n", idx,
642                        mds->mds_lov_desc.ld_tgt_count);
643                 GOTO(out, rc = -EINVAL);
644         }
645
646         rc = mds_lov_get_objid(obd, idx);
647         if (rc)
648                 GOTO(out, rc);
649
650         page = idx / OBJID_PER_PAGE();
651         off = idx % OBJID_PER_PAGE();
652         data = mds->mds_lov_page_array[page];
653
654         /* We have read this lastid from disk; tell the osc.
655            Don't call this during recovery. */
656         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
657         if (rc) {
658                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
659                 /* Don't abort the rest of the sync */
660                 rc = 0;
661         } else {
662                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
663                         data[off], idx, rc);
664         }
665 out:
666         RETURN(rc);
667 }
668
669 /* update the LOV-OSC knowledge of the last used object id's */
670 int mds_lov_connect(struct obd_device *obd, char * lov_name)
671 {
672         struct mds_obd *mds = &obd->u.mds;
673         struct obd_connect_data *data;
674         int rc;
675         ENTRY;
676
677         if (IS_ERR(mds->mds_lov_obd))
678                 RETURN(PTR_ERR(mds->mds_lov_obd));
679
680         if (mds->mds_lov_obd)
681                 RETURN(0);
682
683         mds->mds_lov_obd = class_name2obd(lov_name);
684         if (!mds->mds_lov_obd) {
685                 CERROR("MDS cannot locate LOV %s\n", lov_name);
686                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
687                 RETURN(-ENOTCONN);
688         }
689
690         cfs_mutex_down(&obd->obd_dev_sem);
691         rc = mds_lov_read_objids(obd);
692         cfs_mutex_up(&obd->obd_dev_sem);
693         if (rc) {
694                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
695                 GOTO(err_exit, rc);
696         }
697
698         rc = obd_register_observer(mds->mds_lov_obd, obd);
699         if (rc) {
700                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
701                        lov_name, rc);
702                 GOTO(err_exit, rc);
703         }
704
705         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
706          * targets */
707         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
708
709         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
710
711         OBD_ALLOC(data, sizeof(*data));
712         if (data == NULL)
713                 GOTO(err_exit, rc = -ENOMEM);
714
715         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
716                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
717                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
718                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
719                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
720                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
721                                   OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
722 #ifdef HAVE_LRU_RESIZE_SUPPORT
723         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
724 #endif
725         data->ocd_version = LUSTRE_VERSION_CODE;
726         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
727         /* send max bytes per rpc */
728         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
729         /* send the list of supported checksum types */
730         data->ocd_cksum_types = OBD_CKSUM_ALL;
731         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
732         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
733         OBD_FREE(data, sizeof(*data));
734         if (rc) {
735                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
736                 mds->mds_lov_obd = ERR_PTR(rc);
737                 RETURN(rc);
738         }
739
740         /* I want to see a callback happen when the OBD moves to a
741          * "For General Use" state, and that's when we'll call
742          * set_nextid().  The class driver can help us here, because
743          * it can use the obd_recovering flag to determine when the
744          * the OBD is full available. */
745         /* MDD device will care about that
746         if (!obd->obd_recovering)
747                 rc = mds_postrecov(obd);
748          */
749         RETURN(rc);
750
751 err_exit:
752         mds->mds_lov_exp = NULL;
753         mds->mds_lov_obd = ERR_PTR(rc);
754         RETURN(rc);
755 }
756
757 int mds_lov_disconnect(struct obd_device *obd)
758 {
759         struct mds_obd *mds = &obd->u.mds;
760         int rc = 0;
761         ENTRY;
762
763         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
764                 obd_register_observer(mds->mds_lov_obd, NULL);
765
766                 /* The actual disconnect of the mds_lov will be called from
767                  * class_disconnect_exports from mds_lov_clean. So we have to
768                  * ensure that class_cleanup doesn't fail due to the extra ref
769                  * we're holding now. The mechanism to do that already exists -
770                  * the obd_force flag. We'll drop the final ref to the
771                  * mds_lov_exp in mds_cleanup. */
772                 mds->mds_lov_obd->obd_force = 1;
773         }
774
775         RETURN(rc);
776 }
777
778 struct mds_lov_sync_info {
779         struct obd_device    *mlsi_obd;     /* the lov device to sync */
780         struct obd_device    *mlsi_watched; /* target osc */
781         __u32                 mlsi_index;   /* index of target */
782 };
783
784 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
785 {
786         struct mds_capa_info    info = { .uuid = uuid };
787         struct lustre_capa_key *key;
788         int i, rc = 0;
789
790         ENTRY;
791
792         if (!mds->mds_capa_keys)
793                 RETURN(0);
794
795         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
796         for (i = 0; i < 2; i++) {
797                 key = &mds->mds_capa_keys[i];
798                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
799
800                 info.capa = key;
801                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
802                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
803                 if (rc) {
804                         DEBUG_CAPA_KEY(D_ERROR, key,
805                                        "propagate failed (rc = %d) for", rc);
806                         RETURN(rc);
807                 }
808         }
809
810         RETURN(0);
811 }
812
813 /* We only sync one osc at a time, so that we don't have to hold
814    any kind of lock on the whole mds_lov_desc, which may change
815    (grow) as a result of mds_lov_add_ost.  This also avoids any
816    kind of mismatch between the lov_desc and the mds_lov_desc,
817    which are not in lock-step during lov_add_obd */
818 static int __mds_lov_synchronize(void *data)
819 {
820         struct mds_lov_sync_info *mlsi = data;
821         struct obd_device *obd = mlsi->mlsi_obd;
822         struct obd_device *watched = mlsi->mlsi_watched;
823         struct mds_obd *mds = &obd->u.mds;
824         struct obd_uuid *uuid;
825         __u32  idx = mlsi->mlsi_index;
826         struct mds_group_info mgi;
827         struct llog_ctxt *ctxt;
828         int rc = 0;
829         ENTRY;
830
831         OBD_FREE_PTR(mlsi);
832
833         LASSERT(obd);
834         LASSERT(watched);
835         uuid = &watched->u.cli.cl_target_uuid;
836         LASSERT(uuid);
837
838         cfs_down_read(&mds->mds_notify_lock);
839         if (obd->obd_stopping || obd->obd_fail)
840                 GOTO(out, rc = -ENODEV);
841
842         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
843         rc = mds_lov_update_mds(obd, watched, idx);
844         if (rc != 0) {
845                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
846                 GOTO(out, rc);
847         }
848         mgi.group = mdt_to_obd_objseq(mds->mds_id);
849         mgi.uuid = uuid;
850
851         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
852                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
853         if (rc != 0)
854                 GOTO(out, rc);
855         /* propagate capability keys */
856         rc = mds_propagate_capa_keys(mds, uuid);
857         if (rc)
858                 GOTO(out, rc);
859
860         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
861         if (!ctxt)
862                 GOTO(out, rc = -ENODEV);
863
864         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
865         rc = llog_connect(ctxt, NULL, NULL, uuid);
866         llog_ctxt_put(ctxt);
867         if (rc != 0) {
868                 CERROR("%s failed at llog_origin_connect: %d\n",
869                        obd_uuid2str(uuid), rc);
870                 GOTO(out, rc);
871         }
872
873         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
874               obd->obd_name, obd_uuid2str(uuid));
875
876         rc = mds_lov_clear_orphans(mds, uuid);
877         if (rc != 0) {
878                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
879                        obd_uuid2str(uuid), rc);
880                 GOTO(out, rc);
881         }
882
883 #ifdef HAVE_QUOTA_SUPPORT
884         if (obd->obd_upcall.onu_owner) {
885                 /*
886                  * This is a hack for mds_notify->mdd_notify. When the mds obd
887                  * in mdd is removed, This hack should be removed.
888                  */
889                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
890                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
891                                                 obd->obd_upcall.onu_owner,NULL);
892         }
893 #endif
894         EXIT;
895 out:
896         cfs_up_read(&mds->mds_notify_lock);
897         if (rc) {
898                 /* Deactivate it for safety */
899                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
900                        rc);
901                 if (!obd->obd_stopping && mds->mds_lov_obd &&
902                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
903                         obd_notify(mds->mds_lov_obd, watched,
904                                    OBD_NOTIFY_INACTIVE, NULL);
905         }
906
907         class_decref(obd, "mds_lov_synchronize", obd);
908         return rc;
909 }
910
911 int mds_lov_synchronize(void *data)
912 {
913         struct mds_lov_sync_info *mlsi = data;
914         char name[20];
915
916         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
917         cfs_daemonize_ctxt(name);
918
919         RETURN(__mds_lov_synchronize(data));
920 }
921
922 int mds_lov_start_synchronize(struct obd_device *obd,
923                               struct obd_device *watched,
924                               void *data, enum obd_notify_event ev)
925 {
926         struct mds_lov_sync_info *mlsi;
927         int rc;
928         struct obd_uuid *uuid;
929         ENTRY;
930
931         LASSERT(watched);
932         uuid = &watched->u.cli.cl_target_uuid;
933
934         OBD_ALLOC(mlsi, sizeof(*mlsi));
935         if (mlsi == NULL)
936                 RETURN(-ENOMEM);
937
938         LASSERT(data);
939         mlsi->mlsi_obd = obd;
940         mlsi->mlsi_watched = watched;
941         mlsi->mlsi_index = *(__u32 *)data;
942
943         /* Although class_export_get(obd->obd_self_export) would lock
944            the MDS in place, since it's only a self-export
945            it doesn't lock the LOV in place.  The LOV can be disconnected
946            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
947            Simply taking an export ref on the LOV doesn't help, because it's
948            still disconnected. Taking an obd reference insures that we don't
949            disconnect the LOV.  This of course means a cleanup won't
950            finish for as long as the sync is blocking. */
951         class_incref(obd, "mds_lov_synchronize", obd);
952
953         if (ev != OBD_NOTIFY_SYNC) {
954                 /* Synchronize in the background */
955                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
956                                        CLONE_VM | CLONE_FILES);
957                 if (rc < 0) {
958                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
959                                obd->obd_name, rc);
960                         class_decref(obd, "mds_lov_synchronize", obd);
961                 } else {
962                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
963                                "thread=%d\n", obd->obd_name,
964                                mlsi->mlsi_index, rc);
965                         rc = 0;
966                 }
967         } else {
968                 rc = __mds_lov_synchronize((void *)mlsi);
969         }
970
971         RETURN(rc);
972 }
973
974 int mds_notify(struct obd_device *obd, struct obd_device *watched,
975                enum obd_notify_event ev, void *data)
976 {
977         struct mds_obd *mds = &obd->u.mds;
978         int rc = 0;
979         ENTRY;
980
981         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
982
983         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
984                 CERROR("unexpected notification of %s %s!\n",
985                        watched->obd_type->typ_name, watched->obd_name);
986                 RETURN(-EINVAL);
987         }
988
989         /*XXX this notifies the MDD until lov handling use old mds code
990          * must non block!
991          */
992         if (obd->obd_upcall.onu_owner) {
993                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
994                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
995                                                  obd->obd_upcall.onu_owner,
996                                                  &mds->mds_obt.obt_mount_count);
997         }
998
999         switch (ev) {
1000         /* We only handle these: */
1001         case OBD_NOTIFY_CREATE:
1002                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1003                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1004                 /* We still have to fix the lov descriptor for ost's */
1005                 LASSERT(data);
1006                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1007                                           &watched->u.cli.cl_target_uuid);
1008                 RETURN(rc);
1009         case OBD_NOTIFY_ACTIVE:
1010                 /* lov want one or more _active_ targets for work */
1011                 /* activate event should be pass lov idx as argument */
1012         case OBD_NOTIFY_SYNC:
1013         case OBD_NOTIFY_SYNC_NONBLOCK:
1014                 /* sync event should be pass lov idx as argument */
1015                 break;
1016         default:
1017                 RETURN(0);
1018         }
1019
1020         if (obd->obd_recovering) {
1021                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1022                       obd->obd_name,
1023                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1024                 /* We still have to fix the lov descriptor for ost's added
1025                    after the mdt in the config log.  They didn't make it into
1026                    mds_lov_connect. */
1027                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1028                                          &watched->u.cli.cl_target_uuid);
1029         } else {
1030                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1031         }
1032         RETURN(rc);
1033 }