Whamcloud - gitweb
51b2fbc63ea287053c8d974278e1047f36e4cb81
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 skip_bitmap:
70         if (mds->mds_lov_page_array == NULL) {
71                 CERROR("not init page array!\n");
72                 GOTO(skip_array, i);
73
74         }
75         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
76                 obd_id *data = mds->mds_lov_page_array[i];
77
78                 if (data == NULL)
79                         continue;
80
81                 for(j=0; j < OBJID_PER_PAGE(); j++) {
82                         if (data[j] == 0)
83                                 continue;
84                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
85                 }
86         }
87 skip_array:
88         EXIT;
89 }
90
91 int mds_lov_init_objids(struct obd_device *obd)
92 {
93         struct mds_obd *mds = &obd->u.mds;
94         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
95         struct file *file;
96         int rc;
97         ENTRY;
98
99         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100
101         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
102         if (mds->mds_lov_page_dirty == NULL)
103                 RETURN(-ENOMEM);
104
105
106         OBD_ALLOC(mds->mds_lov_page_array, size);
107         if (mds->mds_lov_page_array == NULL)
108                 GOTO(err_free_bitmap, rc = -ENOMEM);
109
110         /* open and test the lov objd file */
111         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
112         if (IS_ERR(file)) {
113                 rc = PTR_ERR(file);
114                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
115                 GOTO(err_free, rc = PTR_ERR(file));
116         }
117         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
118                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
119                        file->f_dentry->d_inode->i_mode);
120                 GOTO(err_open, rc = -ENOENT);
121         }
122         mds->mds_lov_objid_filp = file;
123
124         RETURN (0);
125 err_open:
126         if (filp_close((struct file *)file, 0))
127                 CERROR("can't close %s after error\n", LOV_OBJID);
128 err_free:
129         OBD_FREE(mds->mds_lov_page_array, size);
130 err_free_bitmap:
131         FREE_BITMAP(mds->mds_lov_page_dirty);
132
133         RETURN(rc);
134 }
135
136 void mds_lov_destroy_objids(struct obd_device *obd)
137 {
138         struct mds_obd *mds = &obd->u.mds;
139         int i, rc;
140         ENTRY;
141
142         if (mds->mds_lov_page_array != NULL) {
143                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
144                         obd_id *data = mds->mds_lov_page_array[i];
145                         if (data != NULL)
146                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147                 }
148                 OBD_FREE(mds->mds_lov_page_array,
149                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
150         }
151
152         if (mds->mds_lov_objid_filp) {
153                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
154                 mds->mds_lov_objid_filp = NULL;
155                 if (rc)
156                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
157         }
158
159         FREE_BITMAP(mds->mds_lov_page_dirty);
160         EXIT;
161 }
162
163 /**
164  * currently exist two ways for know about ost count and max ost index.
165  * first - after ost is connected to mds and sync process finished
166  * second - get from lmm in recovery process, in case when mds not have configs,
167  * and ost isn't registered in mgs.
168  *
169  * \param mds pointer to mds structure
170  * \param index maxium ost index
171  *
172  * \retval -ENOMEM is not hame memory for new page
173  * \retval 0 is update passed
174  */
175 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
176 {
177         __u32 page = index / OBJID_PER_PAGE();
178         __u32 off = index % OBJID_PER_PAGE();
179         obd_id *data =  mds->mds_lov_page_array[page];
180
181         if (data == NULL) {
182                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
183                 if (data == NULL)
184                         RETURN(-ENOMEM);
185
186                 mds->mds_lov_page_array[page] = data;
187         }
188
189         if (index > mds->mds_lov_objid_max_index) {
190                 mds->mds_lov_objid_lastpage = page;
191                 mds->mds_lov_objid_lastidx = off;
192                 mds->mds_lov_objid_max_index = index;
193         }
194
195         /* workaround - New target not in objids file; increase mdsize */
196         /* ld_tgt_count is used as the max index everywhere, despite its name. */
197         if (data[off] == 0) {
198                 __u32 stripes;
199
200                 data[off] = 1;
201                 mds->mds_lov_objid_count++;
202                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
203                                 mds->mds_lov_objid_count);
204
205                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
206                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
207
208                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
209                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
210                        mds->mds_max_cookiesize);
211         }
212
213         EXIT;
214         return 0;
215 }
216
217 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
218 {
219         struct lov_ost_data_v1 *data;
220         __u32 count;
221         int rc = 0;
222         __u32 j;
223
224         /* if we create file without objects - lmm is NULL */
225         if (lmm == NULL)
226                 return 0;
227
228         switch (le32_to_cpu(lmm->lmm_magic)) {
229                 case LOV_MAGIC_V1:
230                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
231                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
232                         break;
233                 case LOV_MAGIC_V3:
234                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
235                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
236                         break;
237                 default:
238                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
239                         RETURN(-EINVAL);
240         }
241
242
243         mutex_down(&obd->obd_dev_sem);
244         for (j = 0; j < count; j++) {
245                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
246                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
247                         rc = -ENOMEM;
248                         break;
249                 }
250         }
251         mutex_up(&obd->obd_dev_sem);
252
253         RETURN(rc);
254 }
255 EXPORT_SYMBOL(mds_lov_prepare_objids);
256
257 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
258 {
259         struct mds_obd *mds = &obd->u.mds;
260         int j;
261         struct lov_ost_data_v1 *obj;
262         int count;
263         ENTRY;
264
265         /* if we create file without objects - lmm is NULL */
266         if (lmm == NULL)
267                 return;
268
269         switch (le32_to_cpu(lmm->lmm_magic)) {
270                 case LOV_MAGIC_V1:
271                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
272                         obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
273                         break;
274                 case LOV_MAGIC_V3:
275                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
276                         obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
277                         break;
278                 default:
279                         CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
280                         return;
281         }
282
283         for (j = 0; j < count; j++) {
284                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
285                 obd_id id = le64_to_cpu(obj[j].l_object_id);
286                 __u32 page = i / OBJID_PER_PAGE();
287                 __u32 idx = i % OBJID_PER_PAGE();
288                 obd_id *data;
289
290                 data = mds->mds_lov_page_array[page];
291
292                 CDEBUG(D_INODE,"update last object for ost %u"
293                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
294                 if (id > data[idx]) {
295                         data[idx] = id;
296                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
297                 }
298         }
299         EXIT;
300         return;
301 }
302 EXPORT_SYMBOL(mds_lov_update_objids);
303
304
305 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
306                                     __u32 count)
307 {
308         __u32 i;
309         __u32 stripes;
310
311         for(i = 0; i < count; i++) {
312                 if (data[i] == 0)
313                         continue;
314
315                 mds->mds_lov_objid_count++;
316         }
317
318         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
319                          mds->mds_lov_objid_count);
320
321         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
322         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
323
324         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
325                "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
326
327         EXIT;
328         return 0;
329 }
330
331 static int mds_lov_read_objids(struct obd_device *obd)
332 {
333         struct mds_obd *mds = &obd->u.mds;
334         loff_t off = 0;
335         int i, rc, count = 0, page = 0;
336         unsigned long size;
337         ENTRY;
338
339         /* Read everything in the file, even if our current lov desc
340            has fewer targets. Old targets not in the lov descriptor
341            during mds setup may still have valid objids. */
342         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
343         if (size == 0)
344                 RETURN(0);
345
346         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
347         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
348
349         for (i = 0; i < page; i++) {
350                 loff_t off_old = off;
351
352                 LASSERT(mds->mds_lov_page_array[i] == NULL);
353                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
354                 if (mds->mds_lov_page_array[i] == NULL)
355                         GOTO(out, rc = -ENOMEM);
356
357                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
358                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
359                 if (rc < 0) {
360                         CERROR("Error reading objids %d\n", rc);
361                         GOTO(out, rc);
362                 }
363
364                 count += (off - off_old)/sizeof(obd_id);
365                 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
366                          CERROR("Can't update mds data\n");
367                          GOTO(out, rc = -EIO);
368                  }
369
370                 if (off == off_old)
371                         break; // eof
372         }
373         mds->mds_lov_objid_lastpage = i;
374         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
375
376         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
377                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
378 out:
379         mds_lov_dump_objids("read",obd);
380
381         RETURN(0);
382 }
383
384 int mds_lov_write_objids(struct obd_device *obd)
385 {
386         struct mds_obd *mds = &obd->u.mds;
387         int i, rc = 0;
388         ENTRY;
389
390         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
391                 RETURN(0);
392
393         mds_lov_dump_objids("write", obd);
394
395         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
396                 obd_id *data =  mds->mds_lov_page_array[i];
397                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
398                 loff_t off = i * size;
399
400                 LASSERT(data != NULL);
401
402                 /* check for particaly filled last page */
403                 if (i == mds->mds_lov_objid_lastpage)
404                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
405
406                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
407                                          size, &off, 0);
408                 if (rc < 0)
409                         break;
410                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
411         }
412         if (rc >= 0)
413                 rc = 0;
414
415         RETURN(rc);
416 }
417 EXPORT_SYMBOL(mds_lov_write_objids);
418
419 static int mds_lov_get_objid(struct obd_device * obd,
420                              obd_id idx)
421 {
422         struct mds_obd *mds = &obd->u.mds;
423         unsigned int page;
424         unsigned int off;
425         obd_id *data;
426         int rc = 0;
427         ENTRY;
428
429         page = idx / OBJID_PER_PAGE();
430         off = idx % OBJID_PER_PAGE();
431         data = mds->mds_lov_page_array[page];
432         if (data[off] < 2) {
433                 /* We never read this lastid; ask the osc */
434                 struct obd_id_info lastid;
435                 __u32 size = sizeof(lastid);
436
437                 lastid.idx = idx;
438                 lastid.data = &data[off];
439                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
440                                   KEY_LAST_ID, &size, &lastid, NULL);
441                 if (rc)
442                         GOTO(out, rc);
443
444                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
445         }
446 out:
447         RETURN(rc);
448 }
449
450 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
451 {
452         int rc;
453         struct obdo oa;
454         struct obd_trans_info oti = {0};
455         struct lov_stripe_md  *empty_ea = NULL;
456         ENTRY;
457
458         LASSERT(mds->mds_lov_page_array != NULL);
459
460         /* This create will in fact either create or destroy:  If the OST is
461          * missing objects below this ID, they will be created.  If it finds
462          * objects above this ID, they will be removed. */
463         memset(&oa, 0, sizeof(oa));
464         oa.o_flags = OBD_FL_DELORPHAN;
465         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
466         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
467         if (ost_uuid != NULL)
468                 oti.oti_ost_uuid = ost_uuid;
469         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
470
471         RETURN(rc);
472 }
473
474 /* for one target */
475 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
476 {
477         struct mds_obd *mds = &obd->u.mds;
478         int rc;
479         struct obd_id_info info;
480         ENTRY;
481
482         LASSERT(!obd->obd_recovering);
483
484         info.idx = idx;
485         info.data = id;
486         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
487                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
488         if (rc)
489                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
490                         obd->obd_name, rc);
491
492         RETURN(rc);
493 }
494
495 /* Update the lov desc for a new size lov. */
496 static int mds_lov_update_desc(struct obd_device *obd, int idx,
497                                struct obd_uuid *uuid)
498 {
499         struct mds_obd *mds = &obd->u.mds;
500         struct lov_desc *ld;
501         __u32 valsize = sizeof(mds->mds_lov_desc);
502         int rc = 0;
503         ENTRY;
504
505         OBD_ALLOC(ld, sizeof(*ld));
506         if (!ld)
507                 RETURN(-ENOMEM);
508
509         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
510                           &valsize, ld, NULL);
511         if (rc)
512                 GOTO(out, rc);
513
514         /* Don't change the mds_lov_desc until the objids size matches the
515            count (paranoia) */
516         mds->mds_lov_desc = *ld;
517         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
518                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
519
520         mutex_down(&obd->obd_dev_sem);
521         rc = mds_lov_update_max_ost(mds, idx);
522         mutex_up(&obd->obd_dev_sem);
523         if (rc != 0)
524                 GOTO(out, rc );
525
526
527         /* If we added a target we have to reconnect the llogs */
528         /* We only _need_ to do this at first add (idx), or the first time
529            after recovery.  However, it should now be safe to call anytime. */
530         rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
531         if (rc)
532                 GOTO(out, rc);
533
534         /*XXX this notifies the MDD until lov handling use old mds code */
535         if (obd->obd_upcall.onu_owner) {
536                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
537                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
538                                                  obd->obd_upcall.onu_owner);
539         }
540 out:
541         OBD_FREE(ld, sizeof(*ld));
542         RETURN(rc);
543 }
544
545 /* Inform MDS about new/updated target */
546 static int mds_lov_update_mds(struct obd_device *obd,
547                               struct obd_device *watched,
548                               __u32 idx)
549 {
550         struct mds_obd *mds = &obd->u.mds;
551         int rc = 0;
552         int page;
553         int off;
554         obd_id *data;
555
556         ENTRY;
557
558         /* Don't let anyone else mess with mds_lov_objids now */
559         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
560         if (rc)
561                 GOTO(out, rc);
562
563         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
564                idx, obd->obd_recovering, obd->obd_async_recov,
565                mds->mds_lov_desc.ld_tgt_count);
566
567         /* idx is set as data from lov_notify. */
568         if (obd->obd_recovering)
569                 GOTO(out, rc);
570
571         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
572                 CERROR("index %d > count %d!\n", idx,
573                        mds->mds_lov_desc.ld_tgt_count);
574                 GOTO(out, rc = -EINVAL);
575         }
576
577         rc = mds_lov_get_objid(obd, idx);
578         if (rc)
579                 GOTO(out, rc);
580
581         page = idx / OBJID_PER_PAGE();
582         off = idx % OBJID_PER_PAGE();
583         data = mds->mds_lov_page_array[page];
584
585         /* We have read this lastid from disk; tell the osc.
586            Don't call this during recovery. */
587         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
588         if (rc) {
589                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
590                 /* Don't abort the rest of the sync */
591                 rc = 0;
592         } else {
593                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
594                         data[off], idx, rc);
595         }
596 out:
597         RETURN(rc);
598 }
599
600 /* update the LOV-OSC knowledge of the last used object id's */
601 int mds_lov_connect(struct obd_device *obd, char * lov_name)
602 {
603         struct mds_obd *mds = &obd->u.mds;
604         struct obd_connect_data *data;
605         int rc;
606         ENTRY;
607
608         if (IS_ERR(mds->mds_osc_obd))
609                 RETURN(PTR_ERR(mds->mds_osc_obd));
610
611         if (mds->mds_osc_obd)
612                 RETURN(0);
613
614         mds->mds_osc_obd = class_name2obd(lov_name);
615         if (!mds->mds_osc_obd) {
616                 CERROR("MDS cannot locate LOV %s\n", lov_name);
617                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
618                 RETURN(-ENOTCONN);
619         }
620
621         mutex_down(&obd->obd_dev_sem);
622         rc = mds_lov_read_objids(obd);
623         mutex_up(&obd->obd_dev_sem);
624         if (rc) {
625                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
626                 GOTO(err_exit, rc);
627         }
628
629         rc = obd_register_observer(mds->mds_osc_obd, obd);
630         if (rc) {
631                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
632                        lov_name, rc);
633                 GOTO(err_exit, rc);
634         }
635
636         mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
637
638         OBD_ALLOC(data, sizeof(*data));
639         if (data == NULL)
640                 RETURN(-ENOMEM);
641         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
642                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
643                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
644                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
645                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
646                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN;
647 #ifdef HAVE_LRU_RESIZE_SUPPORT
648         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
649 #endif
650         data->ocd_version = LUSTRE_VERSION_CODE;
651         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
652         /* send max bytes per rpc */
653         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
654         /* send the list of supported checksum types */
655         data->ocd_cksum_types = OBD_CKSUM_ALL;
656         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
657         rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
658         OBD_FREE(data, sizeof(*data));
659         if (rc) {
660                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
661                 mds->mds_osc_obd = ERR_PTR(rc);
662                 RETURN(rc);
663         }
664
665         /* I want to see a callback happen when the OBD moves to a
666          * "For General Use" state, and that's when we'll call
667          * set_nextid().  The class driver can help us here, because
668          * it can use the obd_recovering flag to determine when the
669          * the OBD is full available. */
670         /* MDD device will care about that
671         if (!obd->obd_recovering)
672                 rc = mds_postrecov(obd);
673          */
674         RETURN(rc);
675
676 err_exit:
677         mds->mds_osc_exp = NULL;
678         mds->mds_osc_obd = ERR_PTR(rc);
679         RETURN(rc);
680 }
681
682 int mds_lov_disconnect(struct obd_device *obd)
683 {
684         struct mds_obd *mds = &obd->u.mds;
685         int rc = 0;
686         ENTRY;
687
688         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
689                 obd_register_observer(mds->mds_osc_obd, NULL);
690
691                 /* The actual disconnect of the mds_lov will be called from
692                  * class_disconnect_exports from mds_lov_clean. So we have to
693                  * ensure that class_cleanup doesn't fail due to the extra ref
694                  * we're holding now. The mechanism to do that already exists -
695                  * the obd_force flag. We'll drop the final ref to the
696                  * mds_osc_exp in mds_cleanup. */
697                 mds->mds_osc_obd->obd_force = 1;
698         }
699
700         RETURN(rc);
701 }
702
703 struct mds_lov_sync_info {
704         struct obd_device *mlsi_obd;     /* the lov device to sync */
705         struct obd_device *mlsi_watched; /* target osc */
706         __u32              mlsi_index;   /* index of target */
707 };
708
709 static int mds_propagate_capa_keys(struct mds_obd *mds)
710 {
711         struct lustre_capa_key *key;
712         int i, rc = 0;
713
714         ENTRY;
715
716         if (!mds->mds_capa_keys)
717                 RETURN(0);
718
719         for (i = 0; i < 2; i++) {
720                 key = &mds->mds_capa_keys[i];
721                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
722
723                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
724                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
725                 if (rc) {
726                         DEBUG_CAPA_KEY(D_ERROR, key,
727                                        "propagate failed (rc = %d) for", rc);
728                         RETURN(rc);
729                 }
730         }
731
732         RETURN(0);
733 }
734
735 /* We only sync one osc at a time, so that we don't have to hold
736    any kind of lock on the whole mds_lov_desc, which may change
737    (grow) as a result of mds_lov_add_ost.  This also avoids any
738    kind of mismatch between the lov_desc and the mds_lov_desc,
739    which are not in lock-step during lov_add_obd */
740 static int __mds_lov_synchronize(void *data)
741 {
742         struct mds_lov_sync_info *mlsi = data;
743         struct obd_device *obd = mlsi->mlsi_obd;
744         struct obd_device *watched = mlsi->mlsi_watched;
745         struct mds_obd *mds = &obd->u.mds;
746         struct obd_uuid *uuid;
747         __u32  idx = mlsi->mlsi_index;
748         struct mds_group_info mgi;
749         struct llog_ctxt *ctxt;
750         int rc = 0;
751         ENTRY;
752
753         OBD_FREE_PTR(mlsi);
754
755         LASSERT(obd);
756         LASSERT(watched);
757         uuid = &watched->u.cli.cl_target_uuid;
758         LASSERT(uuid);
759
760         down_read(&mds->mds_notify_lock);
761         if (obd->obd_stopping || obd->obd_fail)
762                 GOTO(out, rc = -ENODEV);
763
764         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
765         rc = mds_lov_update_mds(obd, watched, idx);
766         if (rc != 0) {
767                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
768                 GOTO(out, rc);
769         }
770         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
771         mgi.uuid = uuid;
772
773         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
774                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
775         if (rc != 0)
776                 GOTO(out, rc);
777         /* propagate capability keys */
778         rc = mds_propagate_capa_keys(mds);
779         if (rc)
780                 GOTO(out, rc);
781
782         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
783         if (!ctxt)
784                 GOTO(out, rc = -ENODEV);
785
786         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
787         rc = llog_connect(ctxt, NULL, NULL, uuid);
788         llog_ctxt_put(ctxt);
789         if (rc != 0) {
790                 CERROR("%s failed at llog_origin_connect: %d\n",
791                        obd_uuid2str(uuid), rc);
792                 GOTO(out, rc);
793         }
794
795         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
796               obd->obd_name, obd_uuid2str(uuid));
797         rc = mds_lov_clear_orphans(mds, uuid);
798         if (rc != 0) {
799                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
800                        obd_uuid2str(uuid), rc);
801                 GOTO(out, rc);
802         }
803
804         if (obd->obd_upcall.onu_owner) {
805                 /*
806                  * This is a hack for mds_notify->mdd_notify. When the mds obd
807                  * in mdd is removed, This hack should be removed.
808                  */
809                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
810                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
811                                                  obd->obd_upcall.onu_owner);
812         }
813         EXIT;
814 out:
815         up_read(&mds->mds_notify_lock);
816         if (rc) {
817                 /* Deactivate it for safety */
818                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
819                        rc);
820                 if (!obd->obd_stopping && mds->mds_osc_obd &&
821                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
822                         obd_notify(mds->mds_osc_obd, watched,
823                                    OBD_NOTIFY_INACTIVE, NULL);
824         }
825
826         class_decref(obd, "mds_lov_synchronize", obd);
827         return rc;
828 }
829
830 int mds_lov_synchronize(void *data)
831 {
832         struct mds_lov_sync_info *mlsi = data;
833         char name[20];
834
835         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
836         ptlrpc_daemonize(name);
837
838         RETURN(__mds_lov_synchronize(data));
839 }
840
841 int mds_lov_start_synchronize(struct obd_device *obd,
842                               struct obd_device *watched,
843                               void *data, int nonblock)
844 {
845         struct mds_lov_sync_info *mlsi;
846         int rc;
847         struct obd_uuid *uuid;
848         ENTRY;
849
850         LASSERT(watched);
851         uuid = &watched->u.cli.cl_target_uuid;
852
853         OBD_ALLOC(mlsi, sizeof(*mlsi));
854         if (mlsi == NULL)
855                 RETURN(-ENOMEM);
856
857         LASSERT(data);
858         mlsi->mlsi_obd = obd;
859         mlsi->mlsi_watched = watched;
860         mlsi->mlsi_index = *(__u32 *)data;
861
862         /* Although class_export_get(obd->obd_self_export) would lock
863            the MDS in place, since it's only a self-export
864            it doesn't lock the LOV in place.  The LOV can be disconnected
865            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
866            Simply taking an export ref on the LOV doesn't help, because it's
867            still disconnected. Taking an obd reference insures that we don't
868            disconnect the LOV.  This of course means a cleanup won't
869            finish for as long as the sync is blocking. */
870         class_incref(obd, "mds_lov_synchronize", obd);
871
872         if (nonblock) {
873                 /* Synchronize in the background */
874                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
875                                        CLONE_VM | CLONE_FILES);
876                 if (rc < 0) {
877                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
878                                obd->obd_name, rc);
879                         class_decref(obd, "mds_lov_synchronize", obd);
880                 } else {
881                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
882                                "thread=%d\n", obd->obd_name,
883                                mlsi->mlsi_index, rc);
884                         rc = 0;
885                 }
886         } else {
887                 rc = __mds_lov_synchronize((void *)mlsi);
888         }
889
890         RETURN(rc);
891 }
892
893 int mds_notify(struct obd_device *obd, struct obd_device *watched,
894                enum obd_notify_event ev, void *data)
895 {
896         int rc = 0;
897         ENTRY;
898
899         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
900
901         switch (ev) {
902         /* We only handle these: */
903         case OBD_NOTIFY_ACTIVE:
904                 /* lov want one or more _active_ targets for work */
905                 /* activate event should be pass lov idx as argument */
906         case OBD_NOTIFY_SYNC:
907         case OBD_NOTIFY_SYNC_NONBLOCK:
908                 /* sync event should be pass lov idx as argument */
909                 break;
910         case OBD_NOTIFY_CONFIG:
911         default:
912                 RETURN(0);
913         }
914
915         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
916                 CERROR("unexpected notification of %s %s!\n",
917                        watched->obd_type->typ_name, watched->obd_name);
918                 RETURN(-EINVAL);
919         }
920
921         if (obd->obd_recovering) {
922                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
923                       obd->obd_name,
924                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
925                 /* We still have to fix the lov descriptor for ost's added
926                    after the mdt in the config log.  They didn't make it into
927                    mds_lov_connect. */
928                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
929                                         &watched->u.cli.cl_target_uuid);
930                 RETURN(rc);
931         }
932
933         rc = mds_lov_start_synchronize(obd, watched, data,
934                                        !(ev == OBD_NOTIFY_SYNC));
935
936         RETURN(rc);
937 }