Whamcloud - gitweb
b=21379 Fix orphans proceeding in osc_create
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty =
104                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
105         if (mds->mds_lov_page_dirty == NULL)
106                 RETURN(-ENOMEM);
107
108
109         OBD_ALLOC(mds->mds_lov_page_array, size);
110         if (mds->mds_lov_page_array == NULL)
111                 GOTO(err_free_bitmap, rc = -ENOMEM);
112
113         /* open and test the lov objd file */
114         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
115         if (IS_ERR(file)) {
116                 rc = PTR_ERR(file);
117                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
118                 GOTO(err_free, rc = PTR_ERR(file));
119         }
120         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
121                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
122                        file->f_dentry->d_inode->i_mode);
123                 GOTO(err_open, rc = -ENOENT);
124         }
125         mds->mds_lov_objid_filp = file;
126
127         RETURN (0);
128 err_open:
129         if (filp_close((struct file *)file, 0))
130                 CERROR("can't close %s after error\n", LOV_OBJID);
131 err_free:
132         OBD_FREE(mds->mds_lov_page_array, size);
133 err_free_bitmap:
134         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
135
136         RETURN(rc);
137 }
138
139 void mds_lov_destroy_objids(struct obd_device *obd)
140 {
141         struct mds_obd *mds = &obd->u.mds;
142         int i, rc;
143         ENTRY;
144
145         if (mds->mds_lov_page_array != NULL) {
146                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
147                         obd_id *data = mds->mds_lov_page_array[i];
148                         if (data != NULL)
149                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150                 }
151                 OBD_FREE(mds->mds_lov_page_array,
152                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
153         }
154
155         if (mds->mds_lov_objid_filp) {
156                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
157                 mds->mds_lov_objid_filp = NULL;
158                 if (rc)
159                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
160         }
161
162         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
163         EXIT;
164 }
165
166 /**
167  * currently exist two ways for know about ost count and max ost index.
168  * first - after ost is connected to mds and sync process finished
169  * second - get from lmm in recovery process, in case when mds not have configs,
170  * and ost isn't registered in mgs.
171  *
172  * \param mds pointer to mds structure
173  * \param index maxium ost index
174  *
175  * \retval -ENOMEM is not hame memory for new page
176  * \retval 0 is update passed
177  */
178 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 {
180         __u32 page = index / OBJID_PER_PAGE();
181         __u32 off = index % OBJID_PER_PAGE();
182         obd_id *data =  mds->mds_lov_page_array[page];
183
184         if (data == NULL) {
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         RETURN(-ENOMEM);
188
189                 mds->mds_lov_page_array[page] = data;
190         }
191
192         if (index > mds->mds_lov_objid_max_index) {
193                 mds->mds_lov_objid_lastpage = page;
194                 mds->mds_lov_objid_lastidx = off;
195                 mds->mds_lov_objid_max_index = index;
196         }
197
198         /* workaround - New target not in objids file; increase mdsize */
199         /* ld_tgt_count is used as the max index everywhere, despite its name. */
200         if (data[off] == 0) {
201                 __u32 stripes;
202
203                 data[off] = 1;
204                 mds->mds_lov_objid_count++;
205                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
206                                 mds->mds_lov_objid_count);
207
208                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
209                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210
211                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
212                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
213                        mds->mds_max_cookiesize);
214         }
215
216         EXIT;
217         return 0;
218 }
219
220 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
221 {
222         struct lov_ost_data_v1 *data;
223         __u32 count;
224         int rc = 0;
225         __u32 j;
226
227         /* if we create file without objects - lmm is NULL */
228         if (lmm == NULL)
229                 return 0;
230
231         switch (le32_to_cpu(lmm->lmm_magic)) {
232                 case LOV_MAGIC_V1:
233                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
234                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
235                         break;
236                 case LOV_MAGIC_V3:
237                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
238                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
239                         break;
240                 default:
241                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
242                         RETURN(-EINVAL);
243         }
244
245
246         cfs_mutex_down(&obd->obd_dev_sem);
247         for (j = 0; j < count; j++) {
248                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
249                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
250                         rc = -ENOMEM;
251                         break;
252                 }
253         }
254         cfs_mutex_up(&obd->obd_dev_sem);
255
256         RETURN(rc);
257 }
258 EXPORT_SYMBOL(mds_lov_prepare_objids);
259
260 /*
261  * write llog orphan record about lost ost object,
262  * Special lsm is allocated with single stripe, caller should deallocated it
263  * after use
264  */
265 static int mds_log_lost_precreated(struct obd_device *obd,
266                                    struct lov_stripe_md **lsmp, int *stripes,
267                                    obd_id id, obd_count count, int idx)
268 {
269         struct lov_stripe_md *lsm = *lsmp;
270         int rc;
271         ENTRY;
272
273         if (*lsmp == NULL) {
274                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
275                 if (rc < 0)
276                         RETURN(rc);
277                 /* need only one stripe, save old value */
278                 *stripes = lsm->lsm_stripe_count;
279                 lsm->lsm_stripe_count = 1;
280                 *lsmp = lsm;
281         }
282
283         lsm->lsm_oinfo[0]->loi_id = id;
284         lsm->lsm_oinfo[0]->loi_gr = mdt_to_obd_objgrp(obd->u.mds.mds_id);
285         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
286
287         rc = mds_log_op_orphan(obd, lsm, count);
288         RETURN(rc);
289 }
290
291 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
292 {
293         struct mds_obd *mds = &obd->u.mds;
294         int j;
295         struct lov_ost_data_v1 *obj;
296         struct lov_stripe_md *lsm = NULL;
297         int stripes = 0;
298         int count;
299         ENTRY;
300
301         /* if we create file without objects - lmm is NULL */
302         if (lmm == NULL)
303                 return;
304
305         switch (le32_to_cpu(lmm->lmm_magic)) {
306                 case LOV_MAGIC_V1:
307                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
308                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
309                         break;
310                 case LOV_MAGIC_V3:
311                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
312                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
313                         break;
314                 default:
315                         CERROR("Unknow lmm type %X !\n",
316                                le32_to_cpu(lmm->lmm_magic));
317                         return;
318         }
319
320         for (j = 0; j < count; j++) {
321                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
322                 obd_id id = le64_to_cpu(obj[j].l_object_id);
323                 __u32 page = i / OBJID_PER_PAGE();
324                 __u32 idx = i % OBJID_PER_PAGE();
325                 obd_id *data;
326
327                 data = mds->mds_lov_page_array[page];
328
329                 CDEBUG(D_INODE,"update last object for ost %u"
330                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
331                 if (id > data[idx]) {
332                         int lost = id - data[idx] - 1;
333                         /* we might have lost precreated objects due to VBR */
334                         if (lost > 0 && obd->obd_recovering) {
335                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
336                                 if (!obd->obd_version_recov)
337                                         CERROR("Unexpected gap in objids\n");
338                                 /* lsm is allocated if NULL */
339                                 mds_log_lost_precreated(obd, &lsm, &stripes,
340                                                         data[idx]+1, lost, i);
341                         }
342                         data[idx] = id;
343                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
344                 }
345         }
346         if (lsm) {
347                 /* restore stripes number */
348                 lsm->lsm_stripe_count = stripes;
349                 obd_free_memmd(mds->mds_lov_exp, &lsm);
350         }
351         EXIT;
352         return;
353 }
354 EXPORT_SYMBOL(mds_lov_update_objids);
355
356 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
357                                     __u32 count)
358 {
359         __u32 i;
360         __u32 stripes;
361
362         for (i = 0; i < count; i++) {
363                 if (data[i] == 0)
364                         continue;
365
366                 mds->mds_lov_objid_count++;
367         }
368
369         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
370                          mds->mds_lov_objid_count);
371
372         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
373         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
374
375         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
376                "%d/%d\n", stripes, mds->mds_max_mdsize,
377                mds->mds_max_cookiesize);
378
379         EXIT;
380         return 0;
381 }
382
383 static int mds_lov_read_objids(struct obd_device *obd)
384 {
385         struct mds_obd *mds = &obd->u.mds;
386         loff_t off = 0;
387         int i, rc = 0, count = 0, page = 0;
388         unsigned long size;
389         ENTRY;
390
391         /* Read everything in the file, even if our current lov desc
392            has fewer targets. Old targets not in the lov descriptor
393            during mds setup may still have valid objids. */
394         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
395         if (size == 0)
396                 RETURN(0);
397
398         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
399         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
400         for (i = 0; i < page; i++) {
401                 obd_id *data;
402                 loff_t off_old = off;
403
404                 LASSERT(mds->mds_lov_page_array[i] == NULL);
405                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
406                 if (mds->mds_lov_page_array[i] == NULL)
407                         GOTO(out, rc = -ENOMEM);
408
409                 data = mds->mds_lov_page_array[i];
410
411                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
412                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
413                 if (rc < 0) {
414                         CERROR("Error reading objids %d\n", rc);
415                         GOTO(out, rc);
416                 }
417
418                 count += (off - off_old) / sizeof(obd_id);
419                 if (mds_lov_update_from_read(mds, data, count)) {
420                         CERROR("Can't update mds data\n");
421                         GOTO(out, rc = -EIO);
422                 }
423
424                 if (off == off_old)
425                         break; /* eof */
426         }
427         mds->mds_lov_objid_lastpage = i;
428         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
429
430         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
431                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
432 out:
433         mds_lov_dump_objids("read",obd);
434
435         RETURN(rc);
436 }
437
438 int mds_lov_write_objids(struct obd_device *obd)
439 {
440         struct mds_obd *mds = &obd->u.mds;
441         int i = 0, rc = 0;
442         ENTRY;
443
444         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
445                 RETURN(0);
446
447         mds_lov_dump_objids("write", obd);
448
449         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
450                 obd_id *data =  mds->mds_lov_page_array[i];
451                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
452                 loff_t off = i * size;
453
454                 LASSERT(data != NULL);
455
456                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
457                         continue;
458
459                 /* check for particaly filled last page */
460                 if (i == mds->mds_lov_objid_lastpage)
461                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
462
463                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
464                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
465                                          size, &off, 0);
466                 if (rc < 0) {
467                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
468                         break;
469                 }
470         }
471         if (rc >= 0)
472                 rc = 0;
473
474         RETURN(rc);
475 }
476 EXPORT_SYMBOL(mds_lov_write_objids);
477
478 static int mds_lov_get_objid(struct obd_device * obd,
479                              obd_id idx)
480 {
481         struct mds_obd *mds = &obd->u.mds;
482         struct obd_export *lov_exp = mds->mds_lov_exp;
483         unsigned int page;
484         unsigned int off;
485         obd_id *data;
486         __u64 connect_flags;
487         __u32 size;
488         int rc = 0;
489         ENTRY;
490
491         page = idx / OBJID_PER_PAGE();
492         off = idx % OBJID_PER_PAGE();
493         data = mds->mds_lov_page_array[page];
494
495         size = sizeof(__u64);
496         connect_flags = idx;
497         rc = obd_get_info(lov_exp, sizeof(KEY_CONNECT_FLAG), KEY_CONNECT_FLAG,
498                           &size, &connect_flags, NULL);
499         if (rc)
500                 GOTO(out, rc);
501
502         if (data[off] < 2 || connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
503                 /* We never read this lastid; ask the osc */
504                 struct obd_id_info lastid;
505
506                 size = sizeof(lastid);
507                 lastid.idx = idx;
508                 lastid.data = &data[off];
509                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
510                                   &size, &lastid, NULL);
511                 if (rc)
512                         GOTO(out, rc);
513
514                 /* workaround for clean filter */
515                 if (data[off] == 0)
516                         data[off] = 1;
517
518                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
519         }
520         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
521                idx, data, page, off, data[off]);
522 out:
523         RETURN(rc);
524 }
525
526 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
527 {
528         int rc;
529         struct obdo oa = { 0 };
530         struct obd_trans_info oti = {0};
531         struct lov_stripe_md  *empty_ea = NULL;
532         ENTRY;
533
534         LASSERT(mds->mds_lov_page_array != NULL);
535
536         /* This create will in fact either create or destroy:  If the OST is
537          * missing objects below this ID, they will be created.  If it finds
538          * objects above this ID, they will be removed. */
539         memset(&oa, 0, sizeof(oa));
540         oa.o_flags = OBD_FL_DELORPHAN;
541         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
542         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
543         if (ost_uuid != NULL)
544                 oti.oti_ost_uuid = ost_uuid;
545
546         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
547
548         RETURN(rc);
549 }
550
551 /* for one target */
552 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
553 {
554         struct mds_obd *mds = &obd->u.mds;
555         int rc;
556         struct obd_id_info info;
557         ENTRY;
558
559         LASSERT(!obd->obd_recovering);
560
561         info.idx = idx;
562         info.data = id;
563         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
564                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
565         if (rc)
566                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
567                         obd->obd_name, rc);
568
569         RETURN(rc);
570 }
571
572 /* Update the lov desc for a new size lov. */
573 static int mds_lov_update_desc(struct obd_device *obd, int idx,
574                                struct obd_uuid *uuid, enum obd_notify_event ev)
575 {
576         struct mds_obd *mds = &obd->u.mds;
577         struct lov_desc *ld;
578         __u32 valsize = sizeof(mds->mds_lov_desc);
579         int rc = 0;
580         ENTRY;
581
582         OBD_ALLOC(ld, sizeof(*ld));
583         if (!ld)
584                 RETURN(-ENOMEM);
585
586         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
587                           &valsize, ld, NULL);
588         if (rc)
589                 GOTO(out, rc);
590
591         /* Don't change the mds_lov_desc until the objids size matches the
592            count (paranoia) */
593         mds->mds_lov_desc = *ld;
594         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
595                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
596
597         cfs_mutex_down(&obd->obd_dev_sem);
598         rc = mds_lov_update_max_ost(mds, idx);
599         cfs_mutex_up(&obd->obd_dev_sem);
600         if (rc != 0)
601                 GOTO(out, rc );
602
603         /* If we added a target we have to reconnect the llogs */
604         /* We only _need_ to do this at first add (idx), or the first time
605            after recovery.  However, it should now be safe to call anytime. */
606         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
607         if (rc)
608                 GOTO(out, rc);
609
610         /*XXX this notifies the MDD until lov handling use old mds code */
611         if (obd->obd_upcall.onu_owner) {
612                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
613                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
614                                                  obd->obd_upcall.onu_owner,
615                                                  &mds->mds_mount_count);
616         }
617 out:
618         OBD_FREE(ld, sizeof(*ld));
619         RETURN(rc);
620 }
621
622 /* Inform MDS about new/updated target */
623 static int mds_lov_update_mds(struct obd_device *obd,
624                               struct obd_device *watched,
625                               __u32 idx, enum obd_notify_event ev)
626 {
627         struct mds_obd *mds = &obd->u.mds;
628         int rc = 0;
629         int page;
630         int off;
631         obd_id *data;
632
633         ENTRY;
634
635         /* Don't let anyone else mess with mds_lov_objids now */
636         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
637         if (rc)
638                 GOTO(out, rc);
639
640         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
641                idx, obd->obd_recovering, obd->obd_async_recov,
642                mds->mds_lov_desc.ld_tgt_count);
643
644         /* idx is set as data from lov_notify. */
645         if (obd->obd_recovering)
646                 GOTO(out, rc);
647
648         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
649                 CERROR("index %d > count %d!\n", idx,
650                        mds->mds_lov_desc.ld_tgt_count);
651                 GOTO(out, rc = -EINVAL);
652         }
653
654         rc = mds_lov_get_objid(obd, idx);
655         if (rc)
656                 GOTO(out, rc);
657
658         page = idx / OBJID_PER_PAGE();
659         off = idx % OBJID_PER_PAGE();
660         data = mds->mds_lov_page_array[page];
661
662         /* We have read this lastid from disk; tell the osc.
663            Don't call this during recovery. */
664         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
665         if (rc) {
666                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
667                 /* Don't abort the rest of the sync */
668                 rc = 0;
669         } else {
670                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
671                         data[off], idx, rc);
672         }
673 out:
674         RETURN(rc);
675 }
676
677 /* update the LOV-OSC knowledge of the last used object id's */
678 int mds_lov_connect(struct obd_device *obd, char * lov_name)
679 {
680         struct mds_obd *mds = &obd->u.mds;
681         struct obd_connect_data *data;
682         int rc;
683         ENTRY;
684
685         if (IS_ERR(mds->mds_lov_obd))
686                 RETURN(PTR_ERR(mds->mds_lov_obd));
687
688         if (mds->mds_lov_obd)
689                 RETURN(0);
690
691         mds->mds_lov_obd = class_name2obd(lov_name);
692         if (!mds->mds_lov_obd) {
693                 CERROR("MDS cannot locate LOV %s\n", lov_name);
694                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
695                 RETURN(-ENOTCONN);
696         }
697
698         cfs_mutex_down(&obd->obd_dev_sem);
699         rc = mds_lov_read_objids(obd);
700         cfs_mutex_up(&obd->obd_dev_sem);
701         if (rc) {
702                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
703                 GOTO(err_exit, rc);
704         }
705
706         rc = obd_register_observer(mds->mds_lov_obd, obd);
707         if (rc) {
708                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
709                        lov_name, rc);
710                 GOTO(err_exit, rc);
711         }
712
713         /* try init too early */
714         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
715         if (rc)
716                 GOTO(err_exit, rc);
717
718         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
719
720         OBD_ALLOC(data, sizeof(*data));
721         if (data == NULL)
722                 GOTO(err_exit, rc = -ENOMEM);
723
724         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
725                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
726                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
727                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
728                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
729                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
730                                   OBD_CONNECT_SOM;
731 #ifdef HAVE_LRU_RESIZE_SUPPORT
732         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
733 #endif
734         data->ocd_version = LUSTRE_VERSION_CODE;
735         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
736         /* send max bytes per rpc */
737         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
738         /* send the list of supported checksum types */
739         data->ocd_cksum_types = OBD_CKSUM_ALL;
740         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
741         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
742         OBD_FREE(data, sizeof(*data));
743         if (rc) {
744                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
745                 mds->mds_lov_obd = ERR_PTR(rc);
746                 RETURN(rc);
747         }
748
749         /* I want to see a callback happen when the OBD moves to a
750          * "For General Use" state, and that's when we'll call
751          * set_nextid().  The class driver can help us here, because
752          * it can use the obd_recovering flag to determine when the
753          * the OBD is full available. */
754         /* MDD device will care about that
755         if (!obd->obd_recovering)
756                 rc = mds_postrecov(obd);
757          */
758         RETURN(rc);
759
760 err_exit:
761         mds->mds_lov_exp = NULL;
762         mds->mds_lov_obd = ERR_PTR(rc);
763         RETURN(rc);
764 }
765
766 int mds_lov_disconnect(struct obd_device *obd)
767 {
768         struct mds_obd *mds = &obd->u.mds;
769         int rc = 0;
770         ENTRY;
771
772         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
773                 obd_register_observer(mds->mds_lov_obd, NULL);
774
775                 /* The actual disconnect of the mds_lov will be called from
776                  * class_disconnect_exports from mds_lov_clean. So we have to
777                  * ensure that class_cleanup doesn't fail due to the extra ref
778                  * we're holding now. The mechanism to do that already exists -
779                  * the obd_force flag. We'll drop the final ref to the
780                  * mds_lov_exp in mds_cleanup. */
781                 mds->mds_lov_obd->obd_force = 1;
782         }
783
784         RETURN(rc);
785 }
786
787 struct mds_lov_sync_info {
788         struct obd_device    *mlsi_obd;     /* the lov device to sync */
789         struct obd_device    *mlsi_watched; /* target osc */
790         __u32                 mlsi_index;   /* index of target */
791         enum obd_notify_event mlsi_ev;      /* event type */
792 };
793
794 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
795 {
796         struct mds_capa_info    info = { .uuid = uuid };
797         struct lustre_capa_key *key;
798         int i, rc = 0;
799
800         ENTRY;
801
802         if (!mds->mds_capa_keys)
803                 RETURN(0);
804
805         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
806         for (i = 0; i < 2; i++) {
807                 key = &mds->mds_capa_keys[i];
808                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
809
810                 info.capa = key;
811                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
812                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
813                 if (rc) {
814                         DEBUG_CAPA_KEY(D_ERROR, key,
815                                        "propagate failed (rc = %d) for", rc);
816                         RETURN(rc);
817                 }
818         }
819
820         RETURN(0);
821 }
822
823 /* We only sync one osc at a time, so that we don't have to hold
824    any kind of lock on the whole mds_lov_desc, which may change
825    (grow) as a result of mds_lov_add_ost.  This also avoids any
826    kind of mismatch between the lov_desc and the mds_lov_desc,
827    which are not in lock-step during lov_add_obd */
828 static int __mds_lov_synchronize(void *data)
829 {
830         struct mds_lov_sync_info *mlsi = data;
831         struct obd_device *obd = mlsi->mlsi_obd;
832         struct obd_device *watched = mlsi->mlsi_watched;
833         struct mds_obd *mds = &obd->u.mds;
834         struct obd_uuid *uuid;
835         __u32  idx = mlsi->mlsi_index;
836         enum obd_notify_event ev = mlsi->mlsi_ev;
837         struct mds_group_info mgi;
838         struct llog_ctxt *ctxt;
839         int rc = 0;
840         ENTRY;
841
842         OBD_FREE_PTR(mlsi);
843
844         LASSERT(obd);
845         LASSERT(watched);
846         uuid = &watched->u.cli.cl_target_uuid;
847         LASSERT(uuid);
848
849         cfs_down_read(&mds->mds_notify_lock);
850         if (obd->obd_stopping || obd->obd_fail)
851                 GOTO(out, rc = -ENODEV);
852
853         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
854         rc = mds_lov_update_mds(obd, watched, idx, ev);
855         if (rc != 0) {
856                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
857                 GOTO(out, rc);
858         }
859         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
860         mgi.uuid = uuid;
861
862         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
863                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
864         if (rc != 0)
865                 GOTO(out, rc);
866         /* propagate capability keys */
867         rc = mds_propagate_capa_keys(mds, uuid);
868         if (rc)
869                 GOTO(out, rc);
870
871         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
872         if (!ctxt)
873                 GOTO(out, rc = -ENODEV);
874
875         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
876         rc = llog_connect(ctxt, NULL, NULL, uuid);
877         llog_ctxt_put(ctxt);
878         if (rc != 0) {
879                 CERROR("%s failed at llog_origin_connect: %d\n",
880                        obd_uuid2str(uuid), rc);
881                 GOTO(out, rc);
882         }
883
884         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
885               obd->obd_name, obd_uuid2str(uuid));
886         rc = mds_lov_clear_orphans(mds, uuid);
887         if (rc != 0) {
888                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
889                        obd_uuid2str(uuid), rc);
890                 GOTO(out, rc);
891         }
892
893 #ifdef HAVE_QUOTA_SUPPORT
894         if (obd->obd_upcall.onu_owner) { 
895                 /*
896                  * This is a hack for mds_notify->mdd_notify. When the mds obd
897                  * in mdd is removed, This hack should be removed.
898                  */
899                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
900                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
901                                                 obd->obd_upcall.onu_owner,NULL);
902         }
903 #endif
904         EXIT;
905 out:
906         cfs_up_read(&mds->mds_notify_lock);
907         if (rc) {
908                 /* Deactivate it for safety */
909                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
910                        rc);
911                 if (!obd->obd_stopping && mds->mds_lov_obd &&
912                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
913                         obd_notify(mds->mds_lov_obd, watched,
914                                    OBD_NOTIFY_INACTIVE, NULL);
915         }
916
917         class_decref(obd, "mds_lov_synchronize", obd);
918         return rc;
919 }
920
921 int mds_lov_synchronize(void *data)
922 {
923         struct mds_lov_sync_info *mlsi = data;
924         char name[20];
925
926         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
927         cfs_daemonize_ctxt(name);
928
929         RETURN(__mds_lov_synchronize(data));
930 }
931
932 int mds_lov_start_synchronize(struct obd_device *obd,
933                               struct obd_device *watched,
934                               void *data, enum obd_notify_event ev)
935 {
936         struct mds_lov_sync_info *mlsi;
937         int rc;
938         struct obd_uuid *uuid;
939         ENTRY;
940
941         LASSERT(watched);
942         uuid = &watched->u.cli.cl_target_uuid;
943
944         OBD_ALLOC(mlsi, sizeof(*mlsi));
945         if (mlsi == NULL)
946                 RETURN(-ENOMEM);
947
948         LASSERT(data);
949         mlsi->mlsi_obd = obd;
950         mlsi->mlsi_watched = watched;
951         mlsi->mlsi_index = *(__u32 *)data;
952         mlsi->mlsi_ev = ev;
953
954         /* Although class_export_get(obd->obd_self_export) would lock
955            the MDS in place, since it's only a self-export
956            it doesn't lock the LOV in place.  The LOV can be disconnected
957            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
958            Simply taking an export ref on the LOV doesn't help, because it's
959            still disconnected. Taking an obd reference insures that we don't
960            disconnect the LOV.  This of course means a cleanup won't
961            finish for as long as the sync is blocking. */
962         class_incref(obd, "mds_lov_synchronize", obd);
963
964         if (ev != OBD_NOTIFY_SYNC) {
965                 /* Synchronize in the background */
966                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
967                                        CLONE_VM | CLONE_FILES);
968                 if (rc < 0) {
969                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
970                                obd->obd_name, rc);
971                         class_decref(obd, "mds_lov_synchronize", obd);
972                 } else {
973                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
974                                "thread=%d\n", obd->obd_name,
975                                mlsi->mlsi_index, rc);
976                         rc = 0;
977                 }
978         } else {
979                 rc = __mds_lov_synchronize((void *)mlsi);
980         }
981
982         RETURN(rc);
983 }
984
985 int mds_notify(struct obd_device *obd, struct obd_device *watched,
986                enum obd_notify_event ev, void *data)
987 {
988         int rc = 0;
989         ENTRY;
990
991         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
992
993         switch (ev) {
994         /* We only handle these: */
995         case OBD_NOTIFY_ACTIVE:
996                 /* lov want one or more _active_ targets for work */
997                 /* activate event should be pass lov idx as argument */
998         case OBD_NOTIFY_SYNC:
999         case OBD_NOTIFY_SYNC_NONBLOCK:
1000                 /* sync event should be pass lov idx as argument */
1001                 break;
1002         default:
1003                 RETURN(0);
1004         }
1005
1006         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1007                 CERROR("unexpected notification of %s %s!\n",
1008                        watched->obd_type->typ_name, watched->obd_name);
1009                 RETURN(-EINVAL);
1010         }
1011
1012         if (obd->obd_recovering) {
1013                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1014                       obd->obd_name,
1015                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1016                 /* We still have to fix the lov descriptor for ost's added
1017                    after the mdt in the config log.  They didn't make it into
1018                    mds_lov_connect. */
1019                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1020                                          &watched->u.cli.cl_target_uuid, ev);
1021         } else {
1022                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1023         }
1024         RETURN(rc);
1025 }