Whamcloud - gitweb
b=14836
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 static int mds_lov_read_objids(struct obd_device *obd)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         loff_t off = 0;
166         int i, rc, count = 0, page = 0;
167         unsigned long size;
168         ENTRY;
169
170         /* Read everything in the file, even if our current lov desc
171            has fewer targets. Old targets not in the lov descriptor
172            during mds setup may still have valid objids. */
173         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
174         if (size == 0)
175                 RETURN(0);
176
177         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
179
180         for (i = 0; i < page; i++) {
181                 obd_id *data =  mds->mds_lov_page_array[i];
182                 loff_t off_old = off;
183
184                 LASSERT(data == NULL);
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         GOTO(out, rc = -ENOMEM);
188
189                 mds->mds_lov_page_array[i] = data;
190
191                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
193                 if (rc < 0) {
194                         CERROR("Error reading objids %d\n", rc);
195                         GOTO(out, rc);
196                 }
197                 if (off == off_old)
198                         break; // eof
199
200                 count += (off - off_old)/sizeof(obd_id);
201         }
202         mds->mds_lov_objid_count = count;
203         if (count) {
204                 count --;
205                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
207         }
208         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
210 out:
211         mds_lov_dump_objids("read",obd);
212
213         RETURN(0);
214 }
215
216 int mds_lov_write_objids(struct obd_device *obd)
217 {
218         struct mds_obd *mds = &obd->u.mds;
219         int i, rc = 0;
220         ENTRY;
221
222         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
223                 RETURN(0);
224
225         mds_lov_dump_objids("write", obd);
226
227         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228                 obd_id *data =  mds->mds_lov_page_array[i];
229                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230                 loff_t off = i * size;
231
232                 LASSERT(data != NULL);
233
234                 /* check for particaly filled last page */
235                 if (i == mds->mds_lov_objid_lastpage)
236                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
237
238                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
239                                          size, &off, 0);
240                 if (rc < 0)
241                         break;
242                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
243         }
244         if (rc >= 0)
245                 rc = 0;
246
247         RETURN(rc);
248 }
249 EXPORT_SYMBOL(mds_lov_write_objids);
250
251 static int mds_lov_get_objid(struct obd_device * obd,
252                              __u32 idx)
253 {
254         struct mds_obd *mds = &obd->u.mds;
255         unsigned int page;
256         unsigned int off;
257         obd_id *data;
258         int rc = 0;
259         ENTRY;
260
261         page = idx / OBJID_PER_PAGE();
262         off = idx % OBJID_PER_PAGE();
263         data = mds->mds_lov_page_array[page];
264         if (data == NULL) {
265                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
266                 if (data == NULL)
267                         GOTO(out, rc = -ENOMEM);
268
269                 mds->mds_lov_page_array[page] = data;
270         }
271
272         if (data[off] == 0) {
273                 /* We never read this lastid; ask the osc */
274                 struct obd_id_info lastid;
275                 __u32 size = sizeof(lastid);
276
277                 lastid.idx = idx;
278                 lastid.data = &data[off];
279                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280                                   KEY_LAST_ID, &size, &lastid, NULL);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 if (idx > mds->mds_lov_objid_count) {
285                         mds->mds_lov_objid_count = idx;
286                         mds->mds_lov_objid_lastpage = page;
287                         mds->mds_lov_objid_lastidx = off;
288                 }
289                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
290         }
291 out:
292         RETURN(rc);
293 }
294
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
296 {
297         int rc;
298         struct obdo oa;
299         struct obd_trans_info oti = {0};
300         struct lov_stripe_md  *empty_ea = NULL;
301         ENTRY;
302
303         LASSERT(mds->mds_lov_page_array != NULL);
304
305         /* This create will in fact either create or destroy:  If the OST is
306          * missing objects below this ID, they will be created.  If it finds
307          * objects above this ID, they will be removed. */
308         memset(&oa, 0, sizeof(oa));
309         oa.o_flags = OBD_FL_DELORPHAN;
310         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312         if (ost_uuid != NULL)
313                 oti.oti_ost_uuid = ost_uuid;       
314         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
315
316         RETURN(rc);
317 }
318
319 /* for one target */
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
321 {
322         struct mds_obd *mds = &obd->u.mds;
323         int rc;
324         struct obd_id_info info;
325         ENTRY;
326
327         LASSERT(!obd->obd_recovering);
328
329         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
331
332         info.idx = idx;
333         info.data = id;
334         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
336         if (rc)
337                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
338                         obd->obd_name, rc);
339
340         RETURN(rc);
341 }
342
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
345 {
346         struct mds_obd *mds = &obd->u.mds;
347         struct lov_desc *ld;
348         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
349         int rc = 0;
350         ENTRY;
351
352         OBD_ALLOC(ld, sizeof(*ld));
353         if (!ld)
354                 RETURN(-ENOMEM);
355
356         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
357                           &valsize, ld, NULL);
358         if (rc)
359                 GOTO(out, rc);
360
361         /* Don't change the mds_lov_desc until the objids size matches the
362            count (paranoia) */
363         mds->mds_lov_desc = *ld;
364         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
365                mds->mds_lov_desc.ld_tgt_count);
366
367         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
368                                mds->mds_lov_desc.ld_tgt_count);
369
370         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
371         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
372         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
373                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
374                stripes);
375
376         /* If we added a target we have to reconnect the llogs */
377         /* We only _need_ to do this at first add (idx), or the first time
378            after recovery.  However, it should now be safe to call anytime. */
379         rc = llog_cat_initialize(obd, &obd->obd_olg,
380                                  mds->mds_lov_desc.ld_tgt_count, NULL);
381
382         /*XXX this notifies the MDD until lov handling use old mds code */
383         if (obd->obd_upcall.onu_owner) {
384                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
385                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
386                                                  obd->obd_upcall.onu_owner);
387         }
388 out:
389         OBD_FREE(ld, sizeof(*ld));
390         RETURN(rc);
391 }
392
393 /* Inform MDS about new/updated target */
394 static int mds_lov_update_mds(struct obd_device *obd,
395                               struct obd_device *watched,
396                               __u32 idx)
397 {
398         struct mds_obd *mds = &obd->u.mds;
399         int rc = 0;
400         int page;
401         int off;
402         obd_id *data;
403
404         ENTRY;
405
406         /* Don't let anyone else mess with mds_lov_objids now */
407         mutex_down(&obd->obd_dev_sem);
408
409         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
410         if (rc)
411                 GOTO(out, rc);
412
413         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
414                idx, obd->obd_recovering, obd->obd_async_recov,
415                mds->mds_lov_desc.ld_tgt_count);
416
417         /* idx is set as data from lov_notify. */
418         if (obd->obd_recovering)
419                 GOTO(out, rc);
420
421         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
422                 CERROR("index %d > count %d!\n", idx,
423                        mds->mds_lov_desc.ld_tgt_count);
424                 GOTO(out, rc = -EINVAL);
425         }
426
427         rc = mds_lov_get_objid(obd, idx);
428         if (rc)
429                 GOTO(out, rc);
430
431         page = idx / OBJID_PER_PAGE();
432         off = idx % OBJID_PER_PAGE();
433         data = mds->mds_lov_page_array[page];
434
435         /* We have read this lastid from disk; tell the osc.
436            Don't call this during recovery. */
437         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
438         if (rc) {
439                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
440                 /* Don't abort the rest of the sync */
441                 rc = 0;
442         } else {
443                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
444                         data[off], idx, rc);
445         }
446 out:
447         mutex_up(&obd->obd_dev_sem);
448         RETURN(rc);
449 }
450
451 /* update the LOV-OSC knowledge of the last used object id's */
452 int mds_lov_connect(struct obd_device *obd, char * lov_name)
453 {
454         struct mds_obd *mds = &obd->u.mds;
455         struct lustre_handle conn = {0,};
456         struct obd_connect_data *data;
457         int rc;
458         ENTRY;
459
460         if (IS_ERR(mds->mds_osc_obd))
461                 RETURN(PTR_ERR(mds->mds_osc_obd));
462
463         if (mds->mds_osc_obd)
464                 RETURN(0);
465
466         mds->mds_osc_obd = class_name2obd(lov_name);
467         if (!mds->mds_osc_obd) {
468                 CERROR("MDS cannot locate LOV %s\n", lov_name);
469                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
470                 RETURN(-ENOTCONN);
471         }
472
473         OBD_ALLOC(data, sizeof(*data));
474         if (data == NULL)
475                 RETURN(-ENOMEM);
476         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
477                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
478                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
479                                   OBD_CONNECT_AT;
480 #ifdef HAVE_LRU_RESIZE_SUPPORT
481         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
482 #endif
483         data->ocd_version = LUSTRE_VERSION_CODE;
484         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
485         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
486         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
487         OBD_FREE(data, sizeof(*data));
488         if (rc) {
489                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
490                 mds->mds_osc_obd = ERR_PTR(rc);
491                 RETURN(rc);
492         }
493         mds->mds_osc_exp = class_conn2export(&conn);
494
495         rc = obd_register_observer(mds->mds_osc_obd, obd);
496         if (rc) {
497                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
498                        lov_name, rc);
499                 GOTO(err_discon, rc);
500         }
501
502         /* Deny new client connections until we are sure we have some OSTs */
503         obd->obd_no_conn = 1;
504
505         mutex_down(&obd->obd_dev_sem);
506         rc = mds_lov_read_objids(obd);
507         if (rc) {
508                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
509                 GOTO(err_reg, rc);
510         }
511
512         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
513         if (rc)
514                 GOTO(err_reg, rc);
515
516         /* tgt_count may be 0! */
517         rc = llog_cat_initialize(obd, &obd->obd_olg, 
518                                  mds->mds_lov_desc.ld_tgt_count, NULL);
519         if (rc) {
520                 CERROR("failed to initialize catalog %d\n", rc);
521                 GOTO(err_reg, rc);
522         }
523
524         /* If we're mounting this code for the first time on an existing FS,
525          * we need to populate the objids array from the real OST values */
526         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
527                 __u32 i = mds->mds_lov_objid_count;
528                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
529                         rc = mds_lov_get_objid(obd, i);
530                         if (rc != 0)
531                                 break;
532                 }
533                 if (rc == 0)
534                         rc = mds_lov_write_objids(obd);
535                 if (rc)
536                         CERROR("got last objids from OSTs, but error "
537                                 "in update objids file: %d\n", rc);
538         }
539         mutex_up(&obd->obd_dev_sem);
540
541         /* I want to see a callback happen when the OBD moves to a
542          * "For General Use" state, and that's when we'll call
543          * set_nextid().  The class driver can help us here, because
544          * it can use the obd_recovering flag to determine when the
545          * the OBD is full available. */
546         /* MDD device will care about that
547         if (!obd->obd_recovering)
548                 rc = mds_postrecov(obd);
549          */
550         RETURN(rc);
551
552 err_reg:
553         mutex_up(&obd->obd_dev_sem);
554         obd_register_observer(mds->mds_osc_obd, NULL);
555 err_discon:
556         obd_disconnect(mds->mds_osc_exp);
557         mds->mds_osc_exp = NULL;
558         mds->mds_osc_obd = ERR_PTR(rc);
559         RETURN(rc);
560 }
561
562 int mds_lov_disconnect(struct obd_device *obd)
563 {
564         struct mds_obd *mds = &obd->u.mds;
565         int rc = 0;
566         ENTRY;
567
568         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
569                 obd_register_observer(mds->mds_osc_obd, NULL);
570
571                 /* The actual disconnect of the mds_lov will be called from
572                  * class_disconnect_exports from mds_lov_clean. So we have to
573                  * ensure that class_cleanup doesn't fail due to the extra ref
574                  * we're holding now. The mechanism to do that already exists -
575                  * the obd_force flag. We'll drop the final ref to the
576                  * mds_osc_exp in mds_cleanup. */
577                 mds->mds_osc_obd->obd_force = 1;
578         }
579
580         RETURN(rc);
581 }
582
583 /* Collect the preconditions we need to allow client connects */
584 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
585 {
586         if (flag & CONFIG_LOG)
587                 obd->u.mds.mds_fl_cfglog = 1;
588         if (flag & CONFIG_SYNC)
589                 obd->u.mds.mds_fl_synced = 1;
590         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
591                 /* Open for clients */
592                 obd->obd_no_conn = 0;
593 }
594
595 struct mds_lov_sync_info {
596         struct obd_device *mlsi_obd;     /* the lov device to sync */
597         struct obd_device *mlsi_watched; /* target osc */
598         __u32              mlsi_index;   /* index of target */
599 };
600
601 static int mds_propagate_capa_keys(struct mds_obd *mds)
602 {
603         struct lustre_capa_key *key;
604         int i, rc = 0;
605
606         ENTRY;
607
608         if (!mds->mds_capa_keys)
609                 RETURN(0);
610
611         for (i = 0; i < 2; i++) {
612                 key = &mds->mds_capa_keys[i];
613                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
614
615                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
616                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
617                 if (rc) {
618                         DEBUG_CAPA_KEY(D_ERROR, key,
619                                        "propagate failed (rc = %d) for", rc);
620                         RETURN(rc);
621                 }
622         }
623
624         RETURN(0);
625 }
626
627 /* We only sync one osc at a time, so that we don't have to hold
628    any kind of lock on the whole mds_lov_desc, which may change
629    (grow) as a result of mds_lov_add_ost.  This also avoids any
630    kind of mismatch between the lov_desc and the mds_lov_desc,
631    which are not in lock-step during lov_add_obd */
632 static int __mds_lov_synchronize(void *data)
633 {
634         struct mds_lov_sync_info *mlsi = data;
635         struct obd_device *obd = mlsi->mlsi_obd;
636         struct obd_device *watched = mlsi->mlsi_watched;
637         struct mds_obd *mds = &obd->u.mds;
638         struct obd_uuid *uuid;
639         __u32  idx = mlsi->mlsi_index;
640         struct mds_group_info mgi;
641         struct llog_ctxt *ctxt;
642         int rc = 0;
643         ENTRY;
644
645         OBD_FREE_PTR(mlsi);
646
647         LASSERT(obd);
648         LASSERT(watched);
649         uuid = &watched->u.cli.cl_target_uuid;
650         LASSERT(uuid);
651
652         down_read(&mds->mds_notify_lock);
653         if (obd->obd_stopping || obd->obd_fail)
654                 GOTO(out, rc = -ENODEV);
655
656         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
657         rc = mds_lov_update_mds(obd, watched, idx);
658         if (rc != 0) {
659                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
660                 GOTO(out, rc);
661         }
662         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
663         mgi.uuid = uuid;
664
665         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
666                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
667         if (rc != 0)
668                 GOTO(out, rc);
669         /* propagate capability keys */
670         rc = mds_propagate_capa_keys(mds);
671         if (rc)
672                 GOTO(out, rc);
673
674         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
675         if (!ctxt) 
676                 GOTO(out, rc = -ENODEV);
677
678         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
679         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
680                           NULL, NULL, uuid); 
681         llog_ctxt_put(ctxt);
682         if (rc != 0) {
683                 CERROR("%s failed at llog_origin_connect: %d\n",
684                        obd_uuid2str(uuid), rc);
685                 GOTO(out, rc);
686         }
687
688         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
689               obd->obd_name, obd_uuid2str(uuid));
690         rc = mds_lov_clear_orphans(mds, uuid);
691         if (rc != 0) {
692                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
693                        obd_uuid2str(uuid), rc);
694                 GOTO(out, rc);
695         }
696
697         if (obd->obd_upcall.onu_owner) {
698                 /*
699                  * This is a hack for mds_notify->mdd_notify. When the mds obd
700                  * in mdd is removed, This hack should be removed.
701                  */
702                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
703                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
704                                                  obd->obd_upcall.onu_owner);
705         }
706         EXIT;
707 out:
708         up_read(&mds->mds_notify_lock);
709         if (rc) {
710                 /* Deactivate it for safety */
711                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
712                        rc);
713                 if (!obd->obd_stopping && mds->mds_osc_obd &&
714                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
715                         obd_notify(mds->mds_osc_obd, watched,
716                                    OBD_NOTIFY_INACTIVE, NULL);
717         }
718
719         class_decref(obd);
720         return rc;
721 }
722
723 int mds_lov_synchronize(void *data)
724 {
725         struct mds_lov_sync_info *mlsi = data;
726         char name[20];
727
728         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
729         ptlrpc_daemonize(name);
730
731         RETURN(__mds_lov_synchronize(data));
732 }
733
734 int mds_lov_start_synchronize(struct obd_device *obd,
735                               struct obd_device *watched,
736                               void *data, int nonblock)
737 {
738         struct mds_lov_sync_info *mlsi;
739         int rc;
740         struct obd_uuid *uuid;
741         ENTRY;
742
743         LASSERT(watched);
744         uuid = &watched->u.cli.cl_target_uuid;
745
746         OBD_ALLOC(mlsi, sizeof(*mlsi));
747         if (mlsi == NULL)
748                 RETURN(-ENOMEM);
749
750         mlsi->mlsi_obd = obd;
751         mlsi->mlsi_watched = watched;
752         mlsi->mlsi_index = *(__u32 *)data;
753
754         /* Although class_export_get(obd->obd_self_export) would lock
755            the MDS in place, since it's only a self-export
756            it doesn't lock the LOV in place.  The LOV can be disconnected
757            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
758            Simply taking an export ref on the LOV doesn't help, because it's
759            still disconnected. Taking an obd reference insures that we don't
760            disconnect the LOV.  This of course means a cleanup won't
761            finish for as long as the sync is blocking. */
762         class_incref(obd);
763
764         if (nonblock) {
765                 /* Synchronize in the background */
766                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
767                                        CLONE_VM | CLONE_FILES);
768                 if (rc < 0) {
769                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
770                                obd->obd_name, rc);
771                         class_decref(obd);
772                 } else {
773                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
774                                "thread=%d\n", obd->obd_name,
775                                mlsi->mlsi_index, rc);
776                         rc = 0;
777                 }
778         } else {
779                 rc = __mds_lov_synchronize((void *)mlsi);
780         }
781
782         RETURN(rc);
783 }
784
785 int mds_notify(struct obd_device *obd, struct obd_device *watched,
786                enum obd_notify_event ev, void *data)
787 {
788         int rc = 0;
789         ENTRY;
790
791         switch (ev) {
792         /* We only handle these: */
793         case OBD_NOTIFY_ACTIVE:
794         case OBD_NOTIFY_SYNC:
795         case OBD_NOTIFY_SYNC_NONBLOCK:
796                 break;
797         case OBD_NOTIFY_CONFIG:
798                 mds_allow_cli(obd, (unsigned long)data);
799         default:
800                 RETURN(0);
801         }
802
803         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
804         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
805                 CERROR("unexpected notification of %s %s!\n",
806                        watched->obd_type->typ_name, watched->obd_name);
807                 RETURN(-EINVAL);
808         }
809
810         if (obd->obd_recovering) {
811                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
812                       obd->obd_name,
813                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
814                 /* We still have to fix the lov descriptor for ost's added
815                    after the mdt in the config log.  They didn't make it into
816                    mds_lov_connect. */
817                 mutex_down(&obd->obd_dev_sem);
818                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
819                 if (rc) {
820                         mutex_up(&obd->obd_dev_sem);
821                         RETURN(rc);
822                 }
823                 /* We should update init llog here too for replay unlink and 
824                  * possiable llog init race when recovery complete */
825                 llog_cat_initialize(obd, &obd->obd_olg, 
826                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
827                                     &watched->u.cli.cl_target_uuid);
828                 mutex_up(&obd->obd_dev_sem);
829                 mds_allow_cli(obd, CONFIG_SYNC);
830                 RETURN(rc);
831         }
832
833         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
834         rc = mds_lov_start_synchronize(obd, watched, data,
835                                        !(ev == OBD_NOTIFY_SYNC));
836
837         lquota_recovery(mds_quota_interface_ref, obd);
838
839         RETURN(rc);
840 }