Whamcloud - gitweb
b=16227
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 static int mds_lov_read_objids(struct obd_device *obd)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         loff_t off = 0;
166         int i, rc, count = 0, page = 0;
167         unsigned long size;
168         ENTRY;
169
170         /* Read everything in the file, even if our current lov desc
171            has fewer targets. Old targets not in the lov descriptor
172            during mds setup may still have valid objids. */
173         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
174         if (size == 0)
175                 RETURN(0);
176
177         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
179
180         for (i = 0; i < page; i++) {
181                 obd_id *data =  mds->mds_lov_page_array[i];
182                 loff_t off_old = off;
183
184                 LASSERT(data == NULL);
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         GOTO(out, rc = -ENOMEM);
188
189                 mds->mds_lov_page_array[i] = data;
190
191                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
193                 if (rc < 0) {
194                         CERROR("Error reading objids %d\n", rc);
195                         GOTO(out, rc);
196                 }
197                 if (off == off_old)
198                         break; // eof
199
200                 count += (off - off_old)/sizeof(obd_id);
201         }
202         mds->mds_lov_objid_count = count;
203         if (count) {
204                 count --;
205                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
207         }
208         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
210 out:
211         mds_lov_dump_objids("read",obd);
212
213         RETURN(0);
214 }
215
216 int mds_lov_write_objids(struct obd_device *obd)
217 {
218         struct mds_obd *mds = &obd->u.mds;
219         int i, rc = 0;
220         ENTRY;
221
222         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
223                 RETURN(0);
224
225         mds_lov_dump_objids("write", obd);
226
227         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228                 obd_id *data =  mds->mds_lov_page_array[i];
229                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230                 loff_t off = i * size;
231
232                 LASSERT(data != NULL);
233
234                 /* check for particaly filled last page */
235                 if (i == mds->mds_lov_objid_lastpage)
236                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
237
238                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
239                                          size, &off, 0);
240                 if (rc < 0)
241                         break;
242                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
243         }
244         if (rc >= 0)
245                 rc = 0;
246
247         RETURN(rc);
248 }
249 EXPORT_SYMBOL(mds_lov_write_objids);
250
251 static int mds_lov_get_objid(struct obd_device * obd,
252                              __u32 idx)
253 {
254         struct mds_obd *mds = &obd->u.mds;
255         unsigned int page;
256         unsigned int off;
257         obd_id *data;
258         int rc = 0;
259         ENTRY;
260
261         page = idx / OBJID_PER_PAGE();
262         off = idx % OBJID_PER_PAGE();
263         data = mds->mds_lov_page_array[page];
264         if (data == NULL) {
265                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
266                 if (data == NULL)
267                         GOTO(out, rc = -ENOMEM);
268
269                 mds->mds_lov_page_array[page] = data;
270         }
271
272         if (data[off] == 0) {
273                 /* We never read this lastid; ask the osc */
274                 struct obd_id_info lastid;
275                 __u32 size = sizeof(lastid);
276
277                 lastid.idx = idx;
278                 lastid.data = &data[off];
279                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280                                   KEY_LAST_ID, &size, &lastid, NULL);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 if (idx > mds->mds_lov_objid_count) {
285                         mds->mds_lov_objid_count = idx;
286                         mds->mds_lov_objid_lastpage = page;
287                         mds->mds_lov_objid_lastidx = off;
288                 }
289                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
290         }
291 out:
292         RETURN(rc);
293 }
294
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
296 {
297         int rc;
298         struct obdo oa;
299         struct obd_trans_info oti = {0};
300         struct lov_stripe_md  *empty_ea = NULL;
301         ENTRY;
302
303         LASSERT(mds->mds_lov_page_array != NULL);
304
305         /* This create will in fact either create or destroy:  If the OST is
306          * missing objects below this ID, they will be created.  If it finds
307          * objects above this ID, they will be removed. */
308         memset(&oa, 0, sizeof(oa));
309         oa.o_flags = OBD_FL_DELORPHAN;
310         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312         if (ost_uuid != NULL) {
313                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
314                 oa.o_valid |= OBD_MD_FLINLINE;
315         }
316         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
317
318         RETURN(rc);
319 }
320
321 /* for one target */
322 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
323 {
324         struct mds_obd *mds = &obd->u.mds;
325         int rc;
326         struct obd_id_info info;
327         ENTRY;
328
329         LASSERT(!obd->obd_recovering);
330
331         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
332         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
333
334         info.idx = idx;
335         info.data = id;
336         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
337                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
338         if (rc)
339                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
340                         obd->obd_name, rc);
341
342         RETURN(rc);
343 }
344
345 static __u32 mds_lov_get_idx(struct obd_export *lov,
346                              struct obd_uuid *ost_uuid)
347 {
348         int rc;
349         int valsize = sizeof(ost_uuid);
350
351         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
352                           &valsize, ost_uuid, NULL);
353         LASSERT(rc >= 0);
354
355         RETURN(rc);
356 }
357
358 /* Update the lov desc for a new size lov. */
359 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
360 {
361         struct mds_obd *mds = &obd->u.mds;
362         struct lov_desc *ld;
363         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
364         int rc = 0;
365         ENTRY;
366
367         OBD_ALLOC(ld, sizeof(*ld));
368         if (!ld)
369                 RETURN(-ENOMEM);
370
371         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
372                           &valsize, ld, NULL);
373         if (rc)
374                 GOTO(out, rc);
375
376         /* Don't change the mds_lov_desc until the objids size matches the
377            count (paranoia) */
378         mds->mds_lov_desc = *ld;
379         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
380                mds->mds_lov_desc.ld_tgt_count);
381
382         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
383                                mds->mds_lov_desc.ld_tgt_count);
384
385         mds->mds_max_mdsize = lov_mds_md_size(stripes);
386         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
387         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
389                stripes);
390
391         /* If we added a target we have to reconnect the llogs */
392         /* We only _need_ to do this at first add (idx), or the first time
393            after recovery.  However, it should now be safe to call anytime. */
394         rc = llog_cat_initialize(obd, &obd->obd_olg,
395                                  mds->mds_lov_desc.ld_tgt_count, NULL);
396
397         /*XXX this notifies the MDD until lov handling use old mds code */
398         if (obd->obd_upcall.onu_owner) {
399                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
400                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
401                                                  obd->obd_upcall.onu_owner);
402         }
403 out:
404         OBD_FREE(ld, sizeof(*ld));
405         RETURN(rc);
406 }
407
408
409 #define MDSLOV_NO_INDEX -1
410
411 /* Inform MDS about new/updated target */
412 static int mds_lov_update_mds(struct obd_device *obd,
413                               struct obd_device *watched,
414                               __u32 idx)
415 {
416         struct mds_obd *mds = &obd->u.mds;
417         int rc = 0;
418         int page;
419         int off;
420         obd_id *data;
421
422         ENTRY;
423
424         /* Don't let anyone else mess with mds_lov_objids now */
425         mutex_down(&obd->obd_dev_sem);
426
427         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
428         if (rc)
429                 GOTO(out, rc);
430
431         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
432                idx, obd->obd_recovering, obd->obd_async_recov,
433                mds->mds_lov_desc.ld_tgt_count);
434
435         /* idx is set as data from lov_notify. */
436         if (obd->obd_recovering)
437                 GOTO(out, rc);
438
439         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
440                 CERROR("index %d > count %d!\n", idx,
441                        mds->mds_lov_desc.ld_tgt_count);
442                 GOTO(out, rc = -EINVAL);
443         }
444
445         rc = mds_lov_get_objid(obd, idx);
446         if (rc)
447                 GOTO(out, rc);
448
449         page = idx / OBJID_PER_PAGE();
450         off = idx % OBJID_PER_PAGE();
451         data = mds->mds_lov_page_array[page];
452
453         /* We have read this lastid from disk; tell the osc.
454            Don't call this during recovery. */
455         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
456         if (rc) {
457                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
458                 /* Don't abort the rest of the sync */
459                 rc = 0;
460         } else {
461                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
462                         data[off], idx, rc);
463         }
464 out:
465         mutex_up(&obd->obd_dev_sem);
466         RETURN(rc);
467 }
468
469 /* update the LOV-OSC knowledge of the last used object id's */
470 int mds_lov_connect(struct obd_device *obd, char * lov_name)
471 {
472         struct mds_obd *mds = &obd->u.mds;
473         struct lustre_handle conn = {0,};
474         struct obd_connect_data *data;
475         int rc;
476         ENTRY;
477
478         if (IS_ERR(mds->mds_osc_obd))
479                 RETURN(PTR_ERR(mds->mds_osc_obd));
480
481         if (mds->mds_osc_obd)
482                 RETURN(0);
483
484         mds->mds_osc_obd = class_name2obd(lov_name);
485         if (!mds->mds_osc_obd) {
486                 CERROR("MDS cannot locate LOV %s\n", lov_name);
487                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
488                 RETURN(-ENOTCONN);
489         }
490
491         OBD_ALLOC(data, sizeof(*data));
492         if (data == NULL)
493                 RETURN(-ENOMEM);
494         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
495                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
496                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
497                                   OBD_CONNECT_AT;
498 #ifdef HAVE_LRU_RESIZE_SUPPORT
499         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
500 #endif
501         data->ocd_version = LUSTRE_VERSION_CODE;
502         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
503         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
504         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
505         OBD_FREE(data, sizeof(*data));
506         if (rc) {
507                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
508                 mds->mds_osc_obd = ERR_PTR(rc);
509                 RETURN(rc);
510         }
511         mds->mds_osc_exp = class_conn2export(&conn);
512
513         rc = obd_register_observer(mds->mds_osc_obd, obd);
514         if (rc) {
515                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
516                        lov_name, rc);
517                 GOTO(err_discon, rc);
518         }
519
520         /* Deny new client connections until we are sure we have some OSTs */
521         obd->obd_no_conn = 1;
522
523         mutex_down(&obd->obd_dev_sem);
524         rc = mds_lov_read_objids(obd);
525         if (rc) {
526                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
527                 GOTO(err_reg, rc);
528         }
529
530         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
531         if (rc)
532                 GOTO(err_reg, rc);
533
534         /* tgt_count may be 0! */
535         rc = llog_cat_initialize(obd, &obd->obd_olg, 
536                                  mds->mds_lov_desc.ld_tgt_count, NULL);
537         if (rc) {
538                 CERROR("failed to initialize catalog %d\n", rc);
539                 GOTO(err_reg, rc);
540         }
541
542         /* If we're mounting this code for the first time on an existing FS,
543          * we need to populate the objids array from the real OST values */
544         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
545                 __u32 i = mds->mds_lov_objid_count;
546                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
547                         rc = mds_lov_get_objid(obd, i);
548                         if (rc != 0)
549                                 break;
550                 }
551                 if (rc == 0)
552                         rc = mds_lov_write_objids(obd);
553                 if (rc)
554                         CERROR("got last objids from OSTs, but error "
555                                 "in update objids file: %d\n", rc);
556         }
557         mutex_up(&obd->obd_dev_sem);
558
559         /* I want to see a callback happen when the OBD moves to a
560          * "For General Use" state, and that's when we'll call
561          * set_nextid().  The class driver can help us here, because
562          * it can use the obd_recovering flag to determine when the
563          * the OBD is full available. */
564         /* MDD device will care about that
565         if (!obd->obd_recovering)
566                 rc = mds_postrecov(obd);
567          */
568         RETURN(rc);
569
570 err_reg:
571         mutex_up(&obd->obd_dev_sem);
572         obd_register_observer(mds->mds_osc_obd, NULL);
573 err_discon:
574         obd_disconnect(mds->mds_osc_exp);
575         mds->mds_osc_exp = NULL;
576         mds->mds_osc_obd = ERR_PTR(rc);
577         RETURN(rc);
578 }
579
580 int mds_lov_disconnect(struct obd_device *obd)
581 {
582         struct mds_obd *mds = &obd->u.mds;
583         int rc = 0;
584         ENTRY;
585
586         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
587                 obd_register_observer(mds->mds_osc_obd, NULL);
588
589                 /* The actual disconnect of the mds_lov will be called from
590                  * class_disconnect_exports from mds_lov_clean. So we have to
591                  * ensure that class_cleanup doesn't fail due to the extra ref
592                  * we're holding now. The mechanism to do that already exists -
593                  * the obd_force flag. We'll drop the final ref to the
594                  * mds_osc_exp in mds_cleanup. */
595                 mds->mds_osc_obd->obd_force = 1;
596         }
597
598         RETURN(rc);
599 }
600
601 /* Collect the preconditions we need to allow client connects */
602 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
603 {
604         if (flag & CONFIG_LOG)
605                 obd->u.mds.mds_fl_cfglog = 1;
606         if (flag & CONFIG_SYNC)
607                 obd->u.mds.mds_fl_synced = 1;
608         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
609                 /* Open for clients */
610                 obd->obd_no_conn = 0;
611 }
612
613 struct mds_lov_sync_info {
614         struct obd_device *mlsi_obd;     /* the lov device to sync */
615         struct obd_device *mlsi_watched; /* target osc */
616         __u32              mlsi_index;   /* index of target */
617 };
618
619 static int mds_propagate_capa_keys(struct mds_obd *mds)
620 {
621         struct lustre_capa_key *key;
622         int i, rc = 0;
623
624         ENTRY;
625
626         if (!mds->mds_capa_keys)
627                 RETURN(0);
628
629         for (i = 0; i < 2; i++) {
630                 key = &mds->mds_capa_keys[i];
631                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
632
633                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
634                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
635                 if (rc) {
636                         DEBUG_CAPA_KEY(D_ERROR, key,
637                                        "propagate failed (rc = %d) for", rc);
638                         RETURN(rc);
639                 }
640         }
641
642         RETURN(0);
643 }
644
645 /* We only sync one osc at a time, so that we don't have to hold
646    any kind of lock on the whole mds_lov_desc, which may change
647    (grow) as a result of mds_lov_add_ost.  This also avoids any
648    kind of mismatch between the lov_desc and the mds_lov_desc,
649    which are not in lock-step during lov_add_obd */
650 static int __mds_lov_synchronize(void *data)
651 {
652         struct mds_lov_sync_info *mlsi = data;
653         struct obd_device *obd = mlsi->mlsi_obd;
654         struct obd_device *watched = mlsi->mlsi_watched;
655         struct mds_obd *mds = &obd->u.mds;
656         struct obd_uuid *uuid;
657         __u32  idx = mlsi->mlsi_index;
658         struct mds_group_info mgi;
659         struct llog_ctxt *ctxt;
660         int rc = 0;
661         ENTRY;
662
663         OBD_FREE_PTR(mlsi);
664
665         LASSERT(obd);
666         LASSERT(watched);
667         uuid = &watched->u.cli.cl_target_uuid;
668         LASSERT(uuid);
669
670         down_read(&mds->mds_notify_lock);
671         if (obd->obd_stopping || obd->obd_fail)
672                 GOTO(out, rc = -ENODEV);
673
674         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
675         rc = mds_lov_update_mds(obd, watched, idx);
676         if (rc != 0) {
677                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
678                 GOTO(out, rc);
679         }
680         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
681         mgi.uuid = uuid;
682
683         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
684                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
685         if (rc != 0)
686                 GOTO(out, rc);
687         /* propagate capability keys */
688         rc = mds_propagate_capa_keys(mds);
689         if (rc)
690                 GOTO(out, rc);
691
692         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
693         if (!ctxt) 
694                 GOTO(out, rc = -ENODEV);
695
696         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
697         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
698                           NULL, NULL, uuid); 
699         llog_ctxt_put(ctxt);
700         if (rc != 0) {
701                 CERROR("%s failed at llog_origin_connect: %d\n",
702                        obd_uuid2str(uuid), rc);
703                 GOTO(out, rc);
704         }
705
706         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
707               obd->obd_name, obd_uuid2str(uuid));
708         rc = mds_lov_clear_orphans(mds, uuid);
709         if (rc != 0) {
710                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
711                        obd_uuid2str(uuid), rc);
712                 GOTO(out, rc);
713         }
714
715         if (obd->obd_upcall.onu_owner) {
716                 /*
717                  * This is a hack for mds_notify->mdd_notify. When the mds obd
718                  * in mdd is removed, This hack should be removed.
719                  */
720                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
721                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
722                                                  obd->obd_upcall.onu_owner);
723         }
724         EXIT;
725 out:
726         up_read(&mds->mds_notify_lock);
727         if (rc) {
728                 /* Deactivate it for safety */
729                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
730                        rc);
731                 if (!obd->obd_stopping && mds->mds_osc_obd &&
732                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
733                         obd_notify(mds->mds_osc_obd, watched,
734                                    OBD_NOTIFY_INACTIVE, NULL);
735         }
736
737         class_decref(obd);
738         return rc;
739 }
740
741 int mds_lov_synchronize(void *data)
742 {
743         struct mds_lov_sync_info *mlsi = data;
744         char name[20];
745
746         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
747         ptlrpc_daemonize(name);
748
749         RETURN(__mds_lov_synchronize(data));
750 }
751
752 int mds_lov_start_synchronize(struct obd_device *obd,
753                               struct obd_device *watched,
754                               void *data, int nonblock)
755 {
756         struct mds_lov_sync_info *mlsi;
757         int rc;
758         struct mds_obd *mds = &obd->u.mds;
759         struct obd_uuid *uuid;
760         ENTRY;
761
762         LASSERT(watched);
763         uuid = &watched->u.cli.cl_target_uuid;
764
765         OBD_ALLOC(mlsi, sizeof(*mlsi));
766         if (mlsi == NULL)
767                 RETURN(-ENOMEM);
768
769         mlsi->mlsi_obd = obd;
770         mlsi->mlsi_watched = watched;
771         if (data)
772                 mlsi->mlsi_index = *(__u32 *)data;
773         else
774                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
775
776         /* Although class_export_get(obd->obd_self_export) would lock
777            the MDS in place, since it's only a self-export
778            it doesn't lock the LOV in place.  The LOV can be disconnected
779            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
780            Simply taking an export ref on the LOV doesn't help, because it's
781            still disconnected. Taking an obd reference insures that we don't
782            disconnect the LOV.  This of course means a cleanup won't
783            finish for as long as the sync is blocking. */
784         class_incref(obd);
785
786         if (nonblock) {
787                 /* Synchronize in the background */
788                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
789                                        CLONE_VM | CLONE_FILES);
790                 if (rc < 0) {
791                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
792                                obd->obd_name, rc);
793                         class_decref(obd);
794                 } else {
795                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
796                                "thread=%d\n", obd->obd_name,
797                                mlsi->mlsi_index, rc);
798                         rc = 0;
799                 }
800         } else {
801                 rc = __mds_lov_synchronize((void *)mlsi);
802         }
803
804         RETURN(rc);
805 }
806
807 int mds_notify(struct obd_device *obd, struct obd_device *watched,
808                enum obd_notify_event ev, void *data)
809 {
810         int rc = 0;
811         ENTRY;
812
813         switch (ev) {
814         /* We only handle these: */
815         case OBD_NOTIFY_ACTIVE:
816         case OBD_NOTIFY_SYNC:
817         case OBD_NOTIFY_SYNC_NONBLOCK:
818                 break;
819         case OBD_NOTIFY_CONFIG:
820                 mds_allow_cli(obd, (unsigned long)data);
821         default:
822                 RETURN(0);
823         }
824
825         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
826         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
827                 CERROR("unexpected notification of %s %s!\n",
828                        watched->obd_type->typ_name, watched->obd_name);
829                 RETURN(-EINVAL);
830         }
831
832         if (obd->obd_recovering) {
833                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
834                       obd->obd_name,
835                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
836                 /* We still have to fix the lov descriptor for ost's added
837                    after the mdt in the config log.  They didn't make it into
838                    mds_lov_connect. */
839                 mutex_down(&obd->obd_dev_sem);
840                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
841                 if (rc) {
842                         mutex_up(&obd->obd_dev_sem);
843                         RETURN(rc);
844                 }
845                 /* We should update init llog here too for replay unlink and 
846                  * possiable llog init race when recovery complete */
847                 llog_cat_initialize(obd, &obd->obd_olg, 
848                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
849                                     &watched->u.cli.cl_target_uuid);
850                 mutex_up(&obd->obd_dev_sem);
851                 mds_allow_cli(obd, CONFIG_SYNC);
852                 RETURN(rc);
853         }
854
855         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
856         rc = mds_lov_start_synchronize(obd, watched, data,
857                                        !(ev == OBD_NOTIFY_SYNC));
858
859         lquota_recovery(mds_quota_interface_ref, obd);
860
861         RETURN(rc);
862 }
863