Whamcloud - gitweb
LU-665 mds: Reduce MDS-OSS connection flags
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         if ((libcfs_debug & D_INFO) == 0)
62                 return;
63
64         CDEBUG(D_INFO, "dump from %s\n", label);
65         if (mds->mds_lov_page_dirty == NULL) {
66                 CERROR("NULL bitmap!\n");
67                 GOTO(skip_bitmap, i);
68         }
69
70         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71                 CDEBUG(D_INFO, "%u - %lx\n", i,
72                        mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i, j, data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty =
107                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108         if (mds->mds_lov_page_dirty == NULL)
109                 RETURN(-ENOMEM);
110
111
112         OBD_ALLOC(mds->mds_lov_page_array, size);
113         if (mds->mds_lov_page_array == NULL)
114                 GOTO(err_free_bitmap, rc = -ENOMEM);
115
116         /* open and test the lov objd file */
117         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
118         if (IS_ERR(file)) {
119                 rc = PTR_ERR(file);
120                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121                 GOTO(err_free, rc = PTR_ERR(file));
122         }
123         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125                        file->f_dentry->d_inode->i_mode);
126                 GOTO(err_open, rc = -ENOENT);
127         }
128         mds->mds_lov_objid_filp = file;
129
130         RETURN (0);
131 err_open:
132         if (filp_close((struct file *)file, 0))
133                 CERROR("can't close %s after error\n", LOV_OBJID);
134 err_free:
135         OBD_FREE(mds->mds_lov_page_array, size);
136 err_free_bitmap:
137         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
138
139         RETURN(rc);
140 }
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168
169 /**
170  * currently exist two ways for know about ost count and max ost index.
171  * first - after ost is connected to mds and sync process finished
172  * second - get from lmm in recovery process, in case when mds not have configs,
173  * and ost isn't registered in mgs.
174  *
175  * \param mds pointer to mds structure
176  * \param index maxium ost index
177  *
178  * \retval -ENOMEM is not hame memory for new page
179  * \retval 0 is update passed
180  */
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
182 {
183         __u32 page = index / OBJID_PER_PAGE();
184         __u32 off = index % OBJID_PER_PAGE();
185         obd_id *data =  mds->mds_lov_page_array[page];
186
187         if (data == NULL) {
188                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189                 if (data == NULL)
190                         RETURN(-ENOMEM);
191
192                 mds->mds_lov_page_array[page] = data;
193         }
194
195         if (index > mds->mds_lov_objid_max_index) {
196                 mds->mds_lov_objid_lastpage = page;
197                 mds->mds_lov_objid_lastidx = off;
198                 mds->mds_lov_objid_max_index = index;
199         }
200
201         /* workaround - New target not in objids file; increase mdsize */
202         /* ld_tgt_count is used as the max index everywhere, despite its name. */
203         if (data[off] == 0) {
204                 __u32 stripes;
205
206                 data[off] = 1;
207                 mds->mds_lov_objid_count++;
208                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
209                                 mds->mds_lov_objid_count);
210
211                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
212                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
213
214                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
215                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
216                        mds->mds_max_cookiesize);
217         }
218
219         EXIT;
220         return 0;
221 }
222
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
224 {
225         __u32 page = index / OBJID_PER_PAGE();
226         __u32 off = index % OBJID_PER_PAGE();
227         obd_id *data =  mds->mds_lov_page_array[page];
228
229         return (data[off] > 0);
230 }
231
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
233 {
234         struct lov_ost_data_v1 *data;
235         __u32 count;
236         int rc = 0;
237         __u32 j;
238
239         /* if we create file without objects - lmm is NULL */
240         if (lmm == NULL)
241                 return 0;
242
243         switch (le32_to_cpu(lmm->lmm_magic)) {
244                 case LOV_MAGIC_V1:
245                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
246                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
247                         break;
248                 case LOV_MAGIC_V3:
249                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
250                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
251                         break;
252                 default:
253                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
254                         RETURN(-EINVAL);
255         }
256
257
258         cfs_mutex_down(&obd->obd_dev_sem);
259         for (j = 0; j < count; j++) {
260                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
261                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
262                         rc = -ENOMEM;
263                         break;
264                 }
265         }
266         cfs_mutex_up(&obd->obd_dev_sem);
267
268         RETURN(rc);
269 }
270 EXPORT_SYMBOL(mds_lov_prepare_objids);
271
272 /*
273  * write llog orphan record about lost ost object,
274  * Special lsm is allocated with single stripe, caller should deallocated it
275  * after use
276  */
277 static int mds_log_lost_precreated(struct obd_device *obd,
278                                    struct lov_stripe_md **lsmp, int *stripes,
279                                    obd_id id, obd_count count, int idx)
280 {
281         struct lov_stripe_md *lsm = *lsmp;
282         int rc;
283         ENTRY;
284
285         if (*lsmp == NULL) {
286                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
287                 if (rc < 0)
288                         RETURN(rc);
289                 /* need only one stripe, save old value */
290                 *stripes = lsm->lsm_stripe_count;
291                 lsm->lsm_stripe_count = 1;
292                 *lsmp = lsm;
293         }
294
295         lsm->lsm_oinfo[0]->loi_id = id;
296         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
297         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
298
299         rc = mds_log_op_orphan(obd, lsm, count);
300         RETURN(rc);
301 }
302
303 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
304 {
305         struct mds_obd *mds = &obd->u.mds;
306         int j;
307         struct lov_ost_data_v1 *obj;
308         struct lov_stripe_md *lsm = NULL;
309         int stripes = 0;
310         int count;
311         ENTRY;
312
313         /* if we create file without objects - lmm is NULL */
314         if (lmm == NULL)
315                 return;
316
317         switch (le32_to_cpu(lmm->lmm_magic)) {
318                 case LOV_MAGIC_V1:
319                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
320                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
321                         break;
322                 case LOV_MAGIC_V3:
323                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
324                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
325                         break;
326                 default:
327                         CERROR("Unknow lmm type %X !\n",
328                                le32_to_cpu(lmm->lmm_magic));
329                         return;
330         }
331
332         for (j = 0; j < count; j++) {
333                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
334                 obd_id id = le64_to_cpu(obj[j].l_object_id);
335                 __u32 page = i / OBJID_PER_PAGE();
336                 __u32 idx = i % OBJID_PER_PAGE();
337                 obd_id *data;
338
339                 data = mds->mds_lov_page_array[page];
340
341                 CDEBUG(D_INODE,"update last object for ost %u"
342                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
343                 if (id > data[idx]) {
344                         int lost = id - data[idx] - 1;
345                         /* we might have lost precreated objects due to VBR */
346                         if (lost > 0 && obd->obd_recovering) {
347                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
348                                 if (!obd->obd_version_recov)
349                                         CERROR("Unexpected gap in objids\n");
350                                 /* lsm is allocated if NULL */
351                                 mds_log_lost_precreated(obd, &lsm, &stripes,
352                                                         data[idx]+1, lost, i);
353                         }
354                         data[idx] = id;
355                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
356                 }
357         }
358         if (lsm) {
359                 /* restore stripes number */
360                 lsm->lsm_stripe_count = stripes;
361                 obd_free_memmd(mds->mds_lov_exp, &lsm);
362         }
363         EXIT;
364         return;
365 }
366 EXPORT_SYMBOL(mds_lov_update_objids);
367
368 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
369                                     __u32 count)
370 {
371         __u32 i;
372         __u32 stripes;
373
374         for (i = 0; i < count; i++) {
375                 if (data[i] == 0)
376                         continue;
377
378                 mds->mds_lov_objid_count++;
379         }
380
381         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
382                          mds->mds_lov_objid_count);
383
384         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
385         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
386
387         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388                "%d/%d\n", stripes, mds->mds_max_mdsize,
389                mds->mds_max_cookiesize);
390
391         EXIT;
392         return 0;
393 }
394
395 static int mds_lov_read_objids(struct obd_device *obd)
396 {
397         struct mds_obd *mds = &obd->u.mds;
398         loff_t off = 0;
399         int i, rc = 0, count = 0, page = 0;
400         unsigned long size;
401         ENTRY;
402
403         /* Read everything in the file, even if our current lov desc
404            has fewer targets. Old targets not in the lov descriptor
405            during mds setup may still have valid objids. */
406         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
407         if (size == 0)
408                 RETURN(0);
409
410         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
411         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
412         for (i = 0; i < page; i++) {
413                 obd_id *data;
414                 loff_t off_old = off;
415
416                 LASSERT(mds->mds_lov_page_array[i] == NULL);
417                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
418                 if (mds->mds_lov_page_array[i] == NULL)
419                         GOTO(out, rc = -ENOMEM);
420
421                 data = mds->mds_lov_page_array[i];
422
423                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
424                                         MDS_LOV_ALLOC_SIZE, &off);
425                 if (rc < 0) {
426                         CERROR("Error reading objids %d\n", rc);
427                         GOTO(out, rc);
428                 }
429                 if (off == off_old) /* hole is read */
430                         off += MDS_LOV_ALLOC_SIZE;
431
432                 count = (off - off_old) / sizeof(obd_id);
433                 if (mds_lov_update_from_read(mds, data, count)) {
434                         CERROR("Can't update mds data\n");
435                         GOTO(out, rc = -EIO);
436                 }
437         }
438         mds->mds_lov_objid_lastpage = page - 1;
439         mds->mds_lov_objid_lastidx = count - 1;
440
441         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
442                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
443 out:
444         mds_lov_dump_objids("read",obd);
445
446         RETURN(rc);
447 }
448
449 int mds_lov_write_objids(struct obd_device *obd)
450 {
451         struct mds_obd *mds = &obd->u.mds;
452         int i = 0, rc = 0;
453         ENTRY;
454
455         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
456                 RETURN(0);
457
458         mds_lov_dump_objids("write", obd);
459
460         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
461                 obd_id *data =  mds->mds_lov_page_array[i];
462                 unsigned int size = MDS_LOV_ALLOC_SIZE;
463                 loff_t off = i * size;
464
465                 LASSERT(data != NULL);
466
467                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
468                         continue;
469
470                 /* check for particaly filled last page */
471                 if (i == mds->mds_lov_objid_lastpage)
472                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
473
474                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
475                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
476                                          size, &off, 0);
477                 if (rc < 0) {
478                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
479                         break;
480                 }
481         }
482         if (rc >= 0)
483                 rc = 0;
484
485         RETURN(rc);
486 }
487 EXPORT_SYMBOL(mds_lov_write_objids);
488
489 static int mds_lov_get_objid(struct obd_device * obd,
490                              obd_id idx)
491 {
492         struct mds_obd *mds = &obd->u.mds;
493         struct obd_export *lov_exp = mds->mds_lov_exp;
494         unsigned int page;
495         unsigned int off;
496         obd_id *data;
497         __u32 size;
498         int rc = 0;
499         ENTRY;
500
501         page = idx / OBJID_PER_PAGE();
502         off = idx % OBJID_PER_PAGE();
503         data = mds->mds_lov_page_array[page];
504
505         if (data[off] < 2) {
506                 /* We never read this lastid; ask the osc */
507                 struct obd_id_info lastid;
508
509                 size = sizeof(lastid);
510                 lastid.idx = idx;
511                 lastid.data = &data[off];
512                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
513                                   &size, &lastid, NULL);
514                 if (rc)
515                         GOTO(out, rc);
516
517                 /* workaround for clean filter */
518                 if (data[off] == 0)
519                         data[off] = 1;
520
521                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
522         }
523         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
524                idx, data, page, off, data[off]);
525 out:
526         RETURN(rc);
527 }
528
529 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
530 {
531         int rc;
532         struct obdo oa = { 0 };
533         struct obd_trans_info oti = {0};
534         struct lov_stripe_md  *empty_ea = NULL;
535         ENTRY;
536
537         LASSERT(mds->mds_lov_page_array != NULL);
538
539         /* This create will in fact either create or destroy:  If the OST is
540          * missing objects below this ID, they will be created.  If it finds
541          * objects above this ID, they will be removed. */
542         memset(&oa, 0, sizeof(oa));
543         oa.o_flags = OBD_FL_DELORPHAN;
544         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
545         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
546         if (ost_uuid != NULL)
547                 oti.oti_ost_uuid = ost_uuid;
548
549         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
550
551         RETURN(rc);
552 }
553
554 /* for one target */
555 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
556 {
557         struct mds_obd *mds = &obd->u.mds;
558         int rc;
559         struct obd_id_info info;
560         ENTRY;
561
562         LASSERT(!obd->obd_recovering);
563
564         info.idx = idx;
565         info.data = id;
566         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
567                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
568         if (rc)
569                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
570                         obd->obd_name, rc);
571
572         RETURN(rc);
573 }
574
575 /* Update the lov desc for a new size lov. */
576 static int mds_lov_update_desc(struct obd_device *obd, int idx,
577                                struct obd_uuid *uuid)
578 {
579         struct mds_obd *mds = &obd->u.mds;
580         struct lov_desc *ld;
581         __u32 valsize = sizeof(mds->mds_lov_desc);
582         int rc = 0;
583         ENTRY;
584
585         OBD_ALLOC(ld, sizeof(*ld));
586         if (!ld)
587                 RETURN(-ENOMEM);
588
589         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
590                           &valsize, ld, NULL);
591         if (rc)
592                 GOTO(out, rc);
593
594         /* Don't change the mds_lov_desc until the objids size matches the
595            count (paranoia) */
596         mds->mds_lov_desc = *ld;
597         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
598                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
599
600         cfs_mutex_down(&obd->obd_dev_sem);
601         rc = mds_lov_update_max_ost(mds, idx);
602         cfs_mutex_up(&obd->obd_dev_sem);
603         if (rc != 0)
604                 GOTO(out, rc );
605
606         /* If we added a target we have to reconnect the llogs */
607         /* We only _need_ to do this at first add (idx), or the first time
608            after recovery.  However, it should now be safe to call anytime. */
609         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
610         if (rc)
611                 GOTO(out, rc);
612
613 out:
614         OBD_FREE(ld, sizeof(*ld));
615         RETURN(rc);
616 }
617
618 /* Inform MDS about new/updated target */
619 static int mds_lov_update_mds(struct obd_device *obd,
620                               struct obd_device *watched,
621                               __u32 idx)
622 {
623         struct mds_obd *mds = &obd->u.mds;
624         int rc = 0;
625         int page;
626         int off;
627         obd_id *data;
628         ENTRY;
629
630         LASSERT(mds_lov_objinit(mds, idx));
631
632         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
633                idx, obd->obd_recovering, obd->obd_async_recov,
634                mds->mds_lov_desc.ld_tgt_count);
635
636         /* idx is set as data from lov_notify. */
637         if (obd->obd_recovering)
638                 GOTO(out, rc);
639
640         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
641                 CERROR("index %d > count %d!\n", idx,
642                        mds->mds_lov_desc.ld_tgt_count);
643                 GOTO(out, rc = -EINVAL);
644         }
645
646         rc = mds_lov_get_objid(obd, idx);
647         if (rc)
648                 GOTO(out, rc);
649
650         page = idx / OBJID_PER_PAGE();
651         off = idx % OBJID_PER_PAGE();
652         data = mds->mds_lov_page_array[page];
653
654         /* We have read this lastid from disk; tell the osc.
655            Don't call this during recovery. */
656         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
657         if (rc) {
658                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
659                 /* Don't abort the rest of the sync */
660                 rc = 0;
661         } else {
662                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
663                         data[off], idx, rc);
664         }
665 out:
666         RETURN(rc);
667 }
668
669 /* update the LOV-OSC knowledge of the last used object id's */
670 int mds_lov_connect(struct obd_device *obd, char * lov_name)
671 {
672         struct mds_obd *mds = &obd->u.mds;
673         struct obd_connect_data *data;
674         int rc;
675         ENTRY;
676
677         if (IS_ERR(mds->mds_lov_obd))
678                 RETURN(PTR_ERR(mds->mds_lov_obd));
679
680         if (mds->mds_lov_obd)
681                 RETURN(0);
682
683         mds->mds_lov_obd = class_name2obd(lov_name);
684         if (!mds->mds_lov_obd) {
685                 CERROR("MDS cannot locate LOV %s\n", lov_name);
686                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
687                 RETURN(-ENOTCONN);
688         }
689
690         cfs_mutex_down(&obd->obd_dev_sem);
691         rc = mds_lov_read_objids(obd);
692         cfs_mutex_up(&obd->obd_dev_sem);
693         if (rc) {
694                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
695                 GOTO(err_exit, rc);
696         }
697
698         rc = obd_register_observer(mds->mds_lov_obd, obd);
699         if (rc) {
700                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
701                        lov_name, rc);
702                 GOTO(err_exit, rc);
703         }
704
705         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
706          * targets */
707         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
708
709         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
710
711         OBD_ALLOC(data, sizeof(*data));
712         if (data == NULL)
713                 GOTO(err_exit, rc = -ENOMEM);
714
715         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
716                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
717                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FULL20  |
718                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
719                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
720                                   OBD_CONNECT_SOM;
721 #ifdef HAVE_LRU_RESIZE_SUPPORT
722         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
723 #endif
724         data->ocd_version = LUSTRE_VERSION_CODE;
725         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
726         /* send max bytes per rpc */
727         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
728         /* send the list of supported checksum types */
729         data->ocd_cksum_types = cksum_types_supported();
730         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
731         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
732         OBD_FREE(data, sizeof(*data));
733         if (rc) {
734                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
735                 mds->mds_lov_obd = ERR_PTR(rc);
736                 RETURN(rc);
737         }
738
739         /* I want to see a callback happen when the OBD moves to a
740          * "For General Use" state, and that's when we'll call
741          * set_nextid().  The class driver can help us here, because
742          * it can use the obd_recovering flag to determine when the
743          * the OBD is full available. */
744         /* MDD device will care about that
745         if (!obd->obd_recovering)
746                 rc = mds_postrecov(obd);
747          */
748         RETURN(rc);
749
750 err_exit:
751         mds->mds_lov_exp = NULL;
752         mds->mds_lov_obd = ERR_PTR(rc);
753         RETURN(rc);
754 }
755
756 int mds_lov_disconnect(struct obd_device *obd)
757 {
758         struct mds_obd *mds = &obd->u.mds;
759         int rc = 0;
760         ENTRY;
761
762         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
763                 obd_register_observer(mds->mds_lov_obd, NULL);
764
765                 /* The actual disconnect of the mds_lov will be called from
766                  * class_disconnect_exports from mds_lov_clean. So we have to
767                  * ensure that class_cleanup doesn't fail due to the extra ref
768                  * we're holding now. The mechanism to do that already exists -
769                  * the obd_force flag. We'll drop the final ref to the
770                  * mds_lov_exp in mds_cleanup. */
771                 mds->mds_lov_obd->obd_force = 1;
772         }
773
774         RETURN(rc);
775 }
776
777 struct mds_lov_sync_info {
778         struct obd_device    *mlsi_obd;     /* the lov device to sync */
779         struct obd_device    *mlsi_watched; /* target osc */
780         __u32                 mlsi_index;   /* index of target */
781 };
782
783 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
784 {
785         struct mds_capa_info    info = { .uuid = uuid };
786         struct lustre_capa_key *key;
787         int i, rc = 0;
788
789         ENTRY;
790
791         if (!mds->mds_capa_keys)
792                 RETURN(0);
793
794         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
795         for (i = 0; i < 2; i++) {
796                 key = &mds->mds_capa_keys[i];
797                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
798
799                 info.capa = key;
800                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
801                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
802                 if (rc) {
803                         DEBUG_CAPA_KEY(D_ERROR, key,
804                                        "propagate failed (rc = %d) for", rc);
805                         RETURN(rc);
806                 }
807         }
808
809         RETURN(0);
810 }
811
812 /* We only sync one osc at a time, so that we don't have to hold
813    any kind of lock on the whole mds_lov_desc, which may change
814    (grow) as a result of mds_lov_add_ost.  This also avoids any
815    kind of mismatch between the lov_desc and the mds_lov_desc,
816    which are not in lock-step during lov_add_obd */
817 static int __mds_lov_synchronize(void *data)
818 {
819         struct mds_lov_sync_info *mlsi = data;
820         struct obd_device *obd = mlsi->mlsi_obd;
821         struct obd_device *watched = mlsi->mlsi_watched;
822         struct mds_obd *mds = &obd->u.mds;
823         struct obd_uuid *uuid;
824         __u32  idx = mlsi->mlsi_index;
825         struct mds_group_info mgi;
826         struct llog_ctxt *ctxt;
827         int rc = 0;
828         ENTRY;
829
830         OBD_FREE_PTR(mlsi);
831
832         LASSERT(obd);
833         LASSERT(watched);
834         uuid = &watched->u.cli.cl_target_uuid;
835         LASSERT(uuid);
836
837         cfs_down_read(&mds->mds_notify_lock);
838         if (obd->obd_stopping || obd->obd_fail)
839                 GOTO(out, rc = -ENODEV);
840
841         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
842         rc = mds_lov_update_mds(obd, watched, idx);
843         if (rc != 0) {
844                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
845                 GOTO(out, rc);
846         }
847         mgi.group = mdt_to_obd_objseq(mds->mds_id);
848         mgi.uuid = uuid;
849
850         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
851                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
852         if (rc != 0)
853                 GOTO(out, rc);
854         /* propagate capability keys */
855         rc = mds_propagate_capa_keys(mds, uuid);
856         if (rc)
857                 GOTO(out, rc);
858
859         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
860         if (!ctxt)
861                 GOTO(out, rc = -ENODEV);
862
863         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
864         rc = llog_connect(ctxt, NULL, NULL, uuid);
865         llog_ctxt_put(ctxt);
866         if (rc != 0) {
867                 CERROR("%s failed at llog_origin_connect: %d\n",
868                        obd_uuid2str(uuid), rc);
869                 GOTO(out, rc);
870         }
871
872         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
873               obd->obd_name, obd_uuid2str(uuid));
874
875         rc = mds_lov_clear_orphans(mds, uuid);
876         if (rc != 0) {
877                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
878                        obd_uuid2str(uuid), rc);
879                 GOTO(out, rc);
880         }
881
882 #ifdef HAVE_QUOTA_SUPPORT
883         if (obd->obd_upcall.onu_owner) {
884                 /*
885                  * This is a hack for mds_notify->mdd_notify. When the mds obd
886                  * in mdd is removed, This hack should be removed.
887                  */
888                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
889                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
890                                                 obd->obd_upcall.onu_owner,NULL);
891         }
892 #endif
893         EXIT;
894 out:
895         cfs_up_read(&mds->mds_notify_lock);
896         if (rc) {
897                 /* Deactivate it for safety */
898                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
899                        rc);
900                 if (!obd->obd_stopping && mds->mds_lov_obd &&
901                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
902                         obd_notify(mds->mds_lov_obd, watched,
903                                    OBD_NOTIFY_INACTIVE, NULL);
904         }
905
906         class_decref(obd, "mds_lov_synchronize", obd);
907         return rc;
908 }
909
910 int mds_lov_synchronize(void *data)
911 {
912         struct mds_lov_sync_info *mlsi = data;
913         char name[20];
914
915         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
916         cfs_daemonize_ctxt(name);
917
918         RETURN(__mds_lov_synchronize(data));
919 }
920
921 int mds_lov_start_synchronize(struct obd_device *obd,
922                               struct obd_device *watched,
923                               void *data, enum obd_notify_event ev)
924 {
925         struct mds_lov_sync_info *mlsi;
926         int rc;
927         struct obd_uuid *uuid;
928         ENTRY;
929
930         LASSERT(watched);
931         uuid = &watched->u.cli.cl_target_uuid;
932
933         OBD_ALLOC(mlsi, sizeof(*mlsi));
934         if (mlsi == NULL)
935                 RETURN(-ENOMEM);
936
937         LASSERT(data);
938         mlsi->mlsi_obd = obd;
939         mlsi->mlsi_watched = watched;
940         mlsi->mlsi_index = *(__u32 *)data;
941
942         /* Although class_export_get(obd->obd_self_export) would lock
943            the MDS in place, since it's only a self-export
944            it doesn't lock the LOV in place.  The LOV can be disconnected
945            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
946            Simply taking an export ref on the LOV doesn't help, because it's
947            still disconnected. Taking an obd reference insures that we don't
948            disconnect the LOV.  This of course means a cleanup won't
949            finish for as long as the sync is blocking. */
950         class_incref(obd, "mds_lov_synchronize", obd);
951
952         if (ev != OBD_NOTIFY_SYNC) {
953                 /* Synchronize in the background */
954                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
955                                        CFS_DAEMON_FLAGS);
956                 if (rc < 0) {
957                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
958                                obd->obd_name, rc);
959                         class_decref(obd, "mds_lov_synchronize", obd);
960                 } else {
961                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
962                                "thread=%d\n", obd->obd_name,
963                                mlsi->mlsi_index, rc);
964                         rc = 0;
965                 }
966         } else {
967                 rc = __mds_lov_synchronize((void *)mlsi);
968         }
969
970         RETURN(rc);
971 }
972
973 int mds_notify(struct obd_device *obd, struct obd_device *watched,
974                enum obd_notify_event ev, void *data)
975 {
976         struct mds_obd *mds = &obd->u.mds;
977         int rc = 0;
978         ENTRY;
979
980         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
981
982         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
983                 CERROR("unexpected notification of %s %s!\n",
984                        watched->obd_type->typ_name, watched->obd_name);
985                 RETURN(-EINVAL);
986         }
987
988         /*XXX this notifies the MDD until lov handling use old mds code
989          * must non block!
990          */
991         if (obd->obd_upcall.onu_owner) {
992                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
993                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
994                                                  obd->obd_upcall.onu_owner,
995                                                  &mds->mds_obt.obt_mount_count);
996         }
997
998         switch (ev) {
999         /* We only handle these: */
1000         case OBD_NOTIFY_CREATE:
1001                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1002                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1003                 /* We still have to fix the lov descriptor for ost's */
1004                 LASSERT(data);
1005                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1006                                           &watched->u.cli.cl_target_uuid);
1007                 RETURN(rc);
1008         case OBD_NOTIFY_ACTIVE:
1009                 /* lov want one or more _active_ targets for work */
1010                 /* activate event should be pass lov idx as argument */
1011         case OBD_NOTIFY_SYNC:
1012         case OBD_NOTIFY_SYNC_NONBLOCK:
1013                 /* sync event should be pass lov idx as argument */
1014                 break;
1015         default:
1016                 RETURN(0);
1017         }
1018
1019         if (obd->obd_recovering) {
1020                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1021                       obd->obd_name,
1022                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1023                 /* We still have to fix the lov descriptor for ost's added
1024                    after the mdt in the config log.  They didn't make it into
1025                    mds_lov_connect. */
1026                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1027                                          &watched->u.cli.cl_target_uuid);
1028         } else {
1029                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1030         }
1031         RETURN(rc);
1032 }