Whamcloud - gitweb
01fcd1a410bc42d12f0ccccd5cd89ad0e42a14c7
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 static int mds_lov_read_objids(struct obd_device *obd)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         loff_t off = 0;
166         int i, rc, count = 0, page = 0;
167         unsigned long size;
168         ENTRY;
169
170         /* Read everything in the file, even if our current lov desc
171            has fewer targets. Old targets not in the lov descriptor
172            during mds setup may still have valid objids. */
173         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
174         if (size == 0)
175                 RETURN(0);
176
177         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
179
180         for (i = 0; i < page; i++) {
181                 obd_id *data =  mds->mds_lov_page_array[i];
182                 loff_t off_old = off;
183
184                 LASSERT(data == NULL);
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         GOTO(out, rc = -ENOMEM);
188
189                 mds->mds_lov_page_array[i] = data;
190
191                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
193                 if (rc < 0) {
194                         CERROR("Error reading objids %d\n", rc);
195                         GOTO(out, rc);
196                 }
197                 if (off == off_old)
198                         break; // eof
199
200                 count += (off - off_old)/sizeof(obd_id);
201         }
202         mds->mds_lov_objid_count = count;
203         if (count) {
204                 count --;
205                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
207         }
208         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
210 out:
211         mds_lov_dump_objids("read",obd);
212
213         RETURN(0);
214 }
215
216 int mds_lov_write_objids(struct obd_device *obd)
217 {
218         struct mds_obd *mds = &obd->u.mds;
219         int i, rc = 0;
220         ENTRY;
221
222         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
223                 RETURN(0);
224
225         mds_lov_dump_objids("write", obd);
226
227         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228                 obd_id *data =  mds->mds_lov_page_array[i];
229                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230                 loff_t off = i * size;
231
232                 LASSERT(data != NULL);
233
234                 /* check for particaly filled last page */
235                 if (i == mds->mds_lov_objid_lastpage)
236                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
237
238                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
239                                          size, &off, 0);
240                 if (rc < 0)
241                         break;
242                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
243         }
244         if (rc >= 0)
245                 rc = 0;
246
247         RETURN(rc);
248 }
249 EXPORT_SYMBOL(mds_lov_write_objids);
250
251 static int mds_lov_get_objid(struct obd_device * obd,
252                              __u32 idx)
253 {
254         struct mds_obd *mds = &obd->u.mds;
255         unsigned int page;
256         unsigned int off;
257         obd_id *data;
258         int rc = 0;
259         ENTRY;
260
261         page = idx / OBJID_PER_PAGE();
262         off = idx % OBJID_PER_PAGE();
263         data = mds->mds_lov_page_array[page];
264         if (data == NULL) {
265                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
266                 if (data == NULL)
267                         GOTO(out, rc = -ENOMEM);
268
269                 mds->mds_lov_page_array[page] = data;
270         }
271
272         if (data[off] == 0) {
273                 /* We never read this lastid; ask the osc */
274                 struct obd_id_info lastid;
275                 __u32 size = sizeof(lastid);
276
277                 lastid.idx = idx;
278                 lastid.data = &data[off];
279                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280                                   KEY_LAST_ID, &size, &lastid, NULL);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 if (idx > mds->mds_lov_objid_count) {
285                         mds->mds_lov_objid_count = idx;
286                         mds->mds_lov_objid_lastpage = page;
287                         mds->mds_lov_objid_lastidx = off;
288                 }
289                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
290         }
291 out:
292         RETURN(rc);
293 }
294
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
296 {
297         int rc;
298         struct obdo oa;
299         struct obd_trans_info oti = {0};
300         struct lov_stripe_md  *empty_ea = NULL;
301         ENTRY;
302
303         LASSERT(mds->mds_lov_page_array != NULL);
304
305         /* This create will in fact either create or destroy:  If the OST is
306          * missing objects below this ID, they will be created.  If it finds
307          * objects above this ID, they will be removed. */
308         memset(&oa, 0, sizeof(oa));
309         oa.o_flags = OBD_FL_DELORPHAN;
310         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312         if (ost_uuid != NULL)
313                 oti.oti_ost_uuid = ost_uuid;       
314         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
315
316         RETURN(rc);
317 }
318
319 /* for one target */
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
321 {
322         struct mds_obd *mds = &obd->u.mds;
323         int rc;
324         struct obd_id_info info;
325         ENTRY;
326
327         LASSERT(!obd->obd_recovering);
328
329         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
331
332         info.idx = idx;
333         info.data = id;
334         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
336         if (rc)
337                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
338                         obd->obd_name, rc);
339
340         RETURN(rc);
341 }
342
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, int idx,
345                                struct obd_uuid *uuid)
346 {
347         struct mds_obd *mds = &obd->u.mds;
348         struct lov_desc *ld;
349         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
350         int rc = 0;
351         ENTRY;
352
353         OBD_ALLOC(ld, sizeof(*ld));
354         if (!ld)
355                 RETURN(-ENOMEM);
356
357         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
358                           &valsize, ld, NULL);
359         if (rc)
360                 GOTO(out, rc);
361
362         /* Don't change the mds_lov_desc until the objids size matches the
363            count (paranoia) */
364         mds->mds_lov_desc = *ld;
365         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
366                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
367
368         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369                                mds->mds_lov_desc.ld_tgt_count);
370
371         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
372         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
374                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
375                stripes);
376
377         /* If we added a target we have to reconnect the llogs */
378         /* We only _need_ to do this at first add (idx), or the first time
379            after recovery.  However, it should now be safe to call anytime. */
380         rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
381         if (rc)
382                 GOTO(out, rc);
383
384         /*XXX this notifies the MDD until lov handling use old mds code */
385         if (obd->obd_upcall.onu_owner) {
386                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
387                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
388                                                  obd->obd_upcall.onu_owner);
389         }
390 out:
391         OBD_FREE(ld, sizeof(*ld));
392         RETURN(rc);
393 }
394
395 /* Inform MDS about new/updated target */
396 static int mds_lov_update_mds(struct obd_device *obd,
397                               struct obd_device *watched,
398                               __u32 idx)
399 {
400         struct mds_obd *mds = &obd->u.mds;
401         int rc = 0;
402         int page;
403         int off;
404         obd_id *data;
405
406         ENTRY;
407
408         /* Don't let anyone else mess with mds_lov_objids now */
409         mutex_down(&obd->obd_dev_sem);
410
411         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
412         if (rc)
413                 GOTO(out, rc);
414
415         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
416                idx, obd->obd_recovering, obd->obd_async_recov,
417                mds->mds_lov_desc.ld_tgt_count);
418
419         /* idx is set as data from lov_notify. */
420         if (obd->obd_recovering)
421                 GOTO(out, rc);
422
423         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
424                 CERROR("index %d > count %d!\n", idx,
425                        mds->mds_lov_desc.ld_tgt_count);
426                 GOTO(out, rc = -EINVAL);
427         }
428
429         rc = mds_lov_get_objid(obd, idx);
430         if (rc)
431                 GOTO(out, rc);
432
433         page = idx / OBJID_PER_PAGE();
434         off = idx % OBJID_PER_PAGE();
435         data = mds->mds_lov_page_array[page];
436
437         /* We have read this lastid from disk; tell the osc.
438            Don't call this during recovery. */
439         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
440         if (rc) {
441                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
442                 /* Don't abort the rest of the sync */
443                 rc = 0;
444         } else {
445                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
446                         data[off], idx, rc);
447         }
448 out:
449         mutex_up(&obd->obd_dev_sem);
450         RETURN(rc);
451 }
452
453 /* update the LOV-OSC knowledge of the last used object id's */
454 int mds_lov_connect(struct obd_device *obd, char * lov_name)
455 {
456         struct mds_obd *mds = &obd->u.mds;
457         struct lustre_handle conn = {0,};
458         struct obd_connect_data *data;
459         int rc;
460         ENTRY;
461
462         if (IS_ERR(mds->mds_osc_obd))
463                 RETURN(PTR_ERR(mds->mds_osc_obd));
464
465         if (mds->mds_osc_obd)
466                 RETURN(0);
467
468         mds->mds_osc_obd = class_name2obd(lov_name);
469         if (!mds->mds_osc_obd) {
470                 CERROR("MDS cannot locate LOV %s\n", lov_name);
471                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
472                 RETURN(-ENOTCONN);
473         }
474
475         OBD_ALLOC(data, sizeof(*data));
476         if (data == NULL)
477                 RETURN(-ENOMEM);
478         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
479                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
480                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
481                                   OBD_CONNECT_AT;
482 #ifdef HAVE_LRU_RESIZE_SUPPORT
483         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
484 #endif
485         data->ocd_version = LUSTRE_VERSION_CODE;
486         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
487         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
488         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
489         OBD_FREE(data, sizeof(*data));
490         if (rc) {
491                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
492                 mds->mds_osc_obd = ERR_PTR(rc);
493                 RETURN(rc);
494         }
495         mds->mds_osc_exp = class_conn2export(&conn);
496
497         rc = obd_register_observer(mds->mds_osc_obd, obd);
498         if (rc) {
499                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
500                        lov_name, rc);
501                 GOTO(err_discon, rc);
502         }
503
504         /* Deny new client connections until we are sure we have some OSTs */
505         obd->obd_no_conn = 1;
506
507         mutex_down(&obd->obd_dev_sem);
508         rc = mds_lov_read_objids(obd);
509         if (rc) {
510                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
511                 GOTO(err_reg, rc);
512         }
513         mutex_up(&obd->obd_dev_sem);
514
515         /* I want to see a callback happen when the OBD moves to a
516          * "For General Use" state, and that's when we'll call
517          * set_nextid().  The class driver can help us here, because
518          * it can use the obd_recovering flag to determine when the
519          * the OBD is full available. */
520         /* MDD device will care about that
521         if (!obd->obd_recovering)
522                 rc = mds_postrecov(obd);
523          */
524         RETURN(rc);
525
526 err_reg:
527         mutex_up(&obd->obd_dev_sem);
528         obd_register_observer(mds->mds_osc_obd, NULL);
529 err_discon:
530         obd_disconnect(mds->mds_osc_exp);
531         mds->mds_osc_exp = NULL;
532         mds->mds_osc_obd = ERR_PTR(rc);
533         RETURN(rc);
534 }
535
536 int mds_lov_disconnect(struct obd_device *obd)
537 {
538         struct mds_obd *mds = &obd->u.mds;
539         int rc = 0;
540         ENTRY;
541
542         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
543                 obd_register_observer(mds->mds_osc_obd, NULL);
544
545                 /* The actual disconnect of the mds_lov will be called from
546                  * class_disconnect_exports from mds_lov_clean. So we have to
547                  * ensure that class_cleanup doesn't fail due to the extra ref
548                  * we're holding now. The mechanism to do that already exists -
549                  * the obd_force flag. We'll drop the final ref to the
550                  * mds_osc_exp in mds_cleanup. */
551                 mds->mds_osc_obd->obd_force = 1;
552         }
553
554         RETURN(rc);
555 }
556
557 /* Collect the preconditions we need to allow client connects */
558 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
559 {
560         if (flag & CONFIG_LOG)
561                 obd->u.mds.mds_fl_cfglog = 1;
562         if (flag & CONFIG_SYNC)
563                 obd->u.mds.mds_fl_synced = 1;
564         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
565                 /* Open for clients */
566                 obd->obd_no_conn = 0;
567 }
568
569 struct mds_lov_sync_info {
570         struct obd_device *mlsi_obd;     /* the lov device to sync */
571         struct obd_device *mlsi_watched; /* target osc */
572         __u32              mlsi_index;   /* index of target */
573 };
574
575 static int mds_propagate_capa_keys(struct mds_obd *mds)
576 {
577         struct lustre_capa_key *key;
578         int i, rc = 0;
579
580         ENTRY;
581
582         if (!mds->mds_capa_keys)
583                 RETURN(0);
584
585         for (i = 0; i < 2; i++) {
586                 key = &mds->mds_capa_keys[i];
587                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
588
589                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
590                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
591                 if (rc) {
592                         DEBUG_CAPA_KEY(D_ERROR, key,
593                                        "propagate failed (rc = %d) for", rc);
594                         RETURN(rc);
595                 }
596         }
597
598         RETURN(0);
599 }
600
601 /* We only sync one osc at a time, so that we don't have to hold
602    any kind of lock on the whole mds_lov_desc, which may change
603    (grow) as a result of mds_lov_add_ost.  This also avoids any
604    kind of mismatch between the lov_desc and the mds_lov_desc,
605    which are not in lock-step during lov_add_obd */
606 static int __mds_lov_synchronize(void *data)
607 {
608         struct mds_lov_sync_info *mlsi = data;
609         struct obd_device *obd = mlsi->mlsi_obd;
610         struct obd_device *watched = mlsi->mlsi_watched;
611         struct mds_obd *mds = &obd->u.mds;
612         struct obd_uuid *uuid;
613         __u32  idx = mlsi->mlsi_index;
614         struct mds_group_info mgi;
615         struct llog_ctxt *ctxt;
616         int rc = 0;
617         ENTRY;
618
619         OBD_FREE_PTR(mlsi);
620
621         LASSERT(obd);
622         LASSERT(watched);
623         uuid = &watched->u.cli.cl_target_uuid;
624         LASSERT(uuid);
625
626         down_read(&mds->mds_notify_lock);
627         if (obd->obd_stopping || obd->obd_fail)
628                 GOTO(out, rc = -ENODEV);
629
630         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
631         rc = mds_lov_update_mds(obd, watched, idx);
632         if (rc != 0) {
633                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
634                 GOTO(out, rc);
635         }
636         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
637         mgi.uuid = uuid;
638
639         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
640                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
641         if (rc != 0)
642                 GOTO(out, rc);
643         /* propagate capability keys */
644         rc = mds_propagate_capa_keys(mds);
645         if (rc)
646                 GOTO(out, rc);
647
648         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
649         if (!ctxt) 
650                 GOTO(out, rc = -ENODEV);
651
652         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
653         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
654                           NULL, NULL, uuid); 
655         llog_ctxt_put(ctxt);
656         if (rc != 0) {
657                 CERROR("%s failed at llog_origin_connect: %d\n",
658                        obd_uuid2str(uuid), rc);
659                 GOTO(out, rc);
660         }
661
662         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
663               obd->obd_name, obd_uuid2str(uuid));
664         rc = mds_lov_clear_orphans(mds, uuid);
665         if (rc != 0) {
666                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
667                        obd_uuid2str(uuid), rc);
668                 GOTO(out, rc);
669         }
670
671         if (obd->obd_upcall.onu_owner) {
672                 /*
673                  * This is a hack for mds_notify->mdd_notify. When the mds obd
674                  * in mdd is removed, This hack should be removed.
675                  */
676                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
677                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
678                                                  obd->obd_upcall.onu_owner);
679         }
680         EXIT;
681 out:
682         up_read(&mds->mds_notify_lock);
683         if (rc) {
684                 /* Deactivate it for safety */
685                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
686                        rc);
687                 if (!obd->obd_stopping && mds->mds_osc_obd &&
688                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
689                         obd_notify(mds->mds_osc_obd, watched,
690                                    OBD_NOTIFY_INACTIVE, NULL);
691         }
692
693         class_decref(obd);
694         return rc;
695 }
696
697 int mds_lov_synchronize(void *data)
698 {
699         struct mds_lov_sync_info *mlsi = data;
700         char name[20];
701
702         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
703         ptlrpc_daemonize(name);
704
705         RETURN(__mds_lov_synchronize(data));
706 }
707
708 int mds_lov_start_synchronize(struct obd_device *obd,
709                               struct obd_device *watched,
710                               void *data, int nonblock)
711 {
712         struct mds_lov_sync_info *mlsi;
713         int rc;
714         struct obd_uuid *uuid;
715         ENTRY;
716
717         LASSERT(watched);
718         uuid = &watched->u.cli.cl_target_uuid;
719
720         OBD_ALLOC(mlsi, sizeof(*mlsi));
721         if (mlsi == NULL)
722                 RETURN(-ENOMEM);
723
724         mlsi->mlsi_obd = obd;
725         mlsi->mlsi_watched = watched;
726         mlsi->mlsi_index = *(__u32 *)data;
727
728         /* Although class_export_get(obd->obd_self_export) would lock
729            the MDS in place, since it's only a self-export
730            it doesn't lock the LOV in place.  The LOV can be disconnected
731            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
732            Simply taking an export ref on the LOV doesn't help, because it's
733            still disconnected. Taking an obd reference insures that we don't
734            disconnect the LOV.  This of course means a cleanup won't
735            finish for as long as the sync is blocking. */
736         class_incref(obd);
737
738         if (nonblock) {
739                 /* Synchronize in the background */
740                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
741                                        CLONE_VM | CLONE_FILES);
742                 if (rc < 0) {
743                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
744                                obd->obd_name, rc);
745                         class_decref(obd);
746                 } else {
747                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
748                                "thread=%d\n", obd->obd_name,
749                                mlsi->mlsi_index, rc);
750                         rc = 0;
751                 }
752         } else {
753                 rc = __mds_lov_synchronize((void *)mlsi);
754         }
755
756         RETURN(rc);
757 }
758
759 int mds_notify(struct obd_device *obd, struct obd_device *watched,
760                enum obd_notify_event ev, void *data)
761 {
762         int rc = 0;
763         ENTRY;
764
765         switch (ev) {
766         /* We only handle these: */
767         case OBD_NOTIFY_ACTIVE:
768         case OBD_NOTIFY_SYNC:
769         case OBD_NOTIFY_SYNC_NONBLOCK:
770                 break;
771         case OBD_NOTIFY_CONFIG:
772                 mds_allow_cli(obd, (unsigned long)data);
773         default:
774                 RETURN(0);
775         }
776
777         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
778         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
779                 CERROR("unexpected notification of %s %s!\n",
780                        watched->obd_type->typ_name, watched->obd_name);
781                 RETURN(-EINVAL);
782         }
783
784         if (obd->obd_recovering) {
785                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
786                       obd->obd_name,
787                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
788                 /* We still have to fix the lov descriptor for ost's added
789                    after the mdt in the config log.  They didn't make it into
790                    mds_lov_connect. */
791                 mutex_down(&obd->obd_dev_sem);
792                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
793                                         &watched->u.cli.cl_target_uuid);
794                 mutex_up(&obd->obd_dev_sem);
795                 if (rc == 0)
796                         mds_allow_cli(obd, CONFIG_SYNC);
797                 RETURN(rc);
798         }
799
800         rc = mds_lov_start_synchronize(obd, watched, data,
801                                        !(ev == OBD_NOTIFY_SYNC));
802
803         lquota_recovery(mds_quota_interface_ref, obd);
804
805         RETURN(rc);
806 }