1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
53 #include "mds_internal.h"
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 struct mds_obd *mds = &obd->u.mds;
60 CDEBUG(D_INFO, "dump from %s\n", label);
61 if (mds->mds_lov_page_dirty == NULL) {
62 CERROR("NULL bitmap!\n");
66 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 if (mds->mds_lov_page_array == NULL) {
70 CERROR("not init page array!\n");
74 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75 obd_id *data = mds->mds_lov_page_array[i];
80 for(j=0; j < OBJID_PER_PAGE(); j++) {
83 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
90 int mds_lov_init_objids(struct obd_device *obd)
92 struct mds_obd *mds = &obd->u.mds;
93 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
98 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101 if (mds->mds_lov_page_dirty == NULL)
105 OBD_ALLOC(mds->mds_lov_page_array, size);
106 if (mds->mds_lov_page_array == NULL)
107 GOTO(err_free_bitmap, rc = -ENOMEM);
109 /* open and test the lov objd file */
110 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
113 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114 GOTO(err_free, rc = PTR_ERR(file));
116 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118 file->f_dentry->d_inode->i_mode);
119 GOTO(err_open, rc = -ENOENT);
121 mds->mds_lov_objid_filp = file;
125 if (filp_close((struct file *)file, 0))
126 CERROR("can't close %s after error\n", LOV_OBJID);
128 OBD_FREE(mds->mds_lov_page_array, size);
130 FREE_BITMAP(mds->mds_lov_page_dirty);
135 void mds_lov_destroy_objids(struct obd_device *obd)
137 struct mds_obd *mds = &obd->u.mds;
141 if (mds->mds_lov_page_array != NULL) {
142 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143 obd_id *data = mds->mds_lov_page_array[i];
145 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147 OBD_FREE(mds->mds_lov_page_array,
148 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
151 if (mds->mds_lov_objid_filp) {
152 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153 mds->mds_lov_objid_filp = NULL;
155 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
158 FREE_BITMAP(mds->mds_lov_page_dirty);
162 static int mds_lov_read_objids(struct obd_device *obd)
164 struct mds_obd *mds = &obd->u.mds;
166 int i, rc, count = 0, page = 0;
170 /* Read everything in the file, even if our current lov desc
171 has fewer targets. Old targets not in the lov descriptor
172 during mds setup may still have valid objids. */
173 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
177 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
180 for (i = 0; i < page; i++) {
181 obd_id *data = mds->mds_lov_page_array[i];
182 loff_t off_old = off;
184 LASSERT(data == NULL);
185 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
187 GOTO(out, rc = -ENOMEM);
189 mds->mds_lov_page_array[i] = data;
191 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192 OBJID_PER_PAGE()*sizeof(obd_id), &off);
194 CERROR("Error reading objids %d\n", rc);
200 count += (off - off_old)/sizeof(obd_id);
202 mds->mds_lov_objid_count = count;
205 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
208 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
211 mds_lov_dump_objids("read",obd);
216 int mds_lov_write_objids(struct obd_device *obd)
218 struct mds_obd *mds = &obd->u.mds;
222 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
225 mds_lov_dump_objids("write", obd);
227 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228 obd_id *data = mds->mds_lov_page_array[i];
229 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230 loff_t off = i * size;
232 LASSERT(data != NULL);
234 /* check for particaly filled last page */
235 if (i == mds->mds_lov_objid_lastpage)
236 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
238 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
242 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
249 EXPORT_SYMBOL(mds_lov_write_objids);
251 static int mds_lov_get_objid(struct obd_device * obd,
254 struct mds_obd *mds = &obd->u.mds;
261 page = idx / OBJID_PER_PAGE();
262 off = idx % OBJID_PER_PAGE();
263 data = mds->mds_lov_page_array[page];
265 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
267 GOTO(out, rc = -ENOMEM);
269 mds->mds_lov_page_array[page] = data;
272 if (data[off] == 0) {
273 /* We never read this lastid; ask the osc */
274 struct obd_id_info lastid;
275 __u32 size = sizeof(lastid);
278 lastid.data = &data[off];
279 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280 KEY_LAST_ID, &size, &lastid, NULL);
284 if (idx > mds->mds_lov_objid_count) {
285 mds->mds_lov_objid_count = idx;
286 mds->mds_lov_objid_lastpage = page;
287 mds->mds_lov_objid_lastidx = off;
289 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
299 struct obd_trans_info oti = {0};
300 struct lov_stripe_md *empty_ea = NULL;
303 LASSERT(mds->mds_lov_page_array != NULL);
305 /* This create will in fact either create or destroy: If the OST is
306 * missing objects below this ID, they will be created. If it finds
307 * objects above this ID, they will be removed. */
308 memset(&oa, 0, sizeof(oa));
309 oa.o_flags = OBD_FL_DELORPHAN;
310 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312 if (ost_uuid != NULL)
313 oti.oti_ost_uuid = ost_uuid;
314 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
322 struct mds_obd *mds = &obd->u.mds;
324 struct obd_id_info info;
327 LASSERT(!obd->obd_recovering);
329 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
334 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335 KEY_NEXT_ID, sizeof(info), &info, NULL);
337 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
346 struct mds_obd *mds = &obd->u.mds;
348 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
352 OBD_ALLOC(ld, sizeof(*ld));
356 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
361 /* Don't change the mds_lov_desc until the objids size matches the
363 mds->mds_lov_desc = *ld;
364 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
365 mds->mds_lov_desc.ld_tgt_count);
367 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
368 mds->mds_lov_desc.ld_tgt_count);
370 mds->mds_max_mdsize = lov_mds_md_size(stripes);
371 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
372 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
373 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
376 /* If we added a target we have to reconnect the llogs */
377 /* We only _need_ to do this at first add (idx), or the first time
378 after recovery. However, it should now be safe to call anytime. */
379 rc = llog_cat_initialize(obd, &obd->obd_olg,
380 mds->mds_lov_desc.ld_tgt_count, NULL);
382 /*XXX this notifies the MDD until lov handling use old mds code */
383 if (obd->obd_upcall.onu_owner) {
384 LASSERT(obd->obd_upcall.onu_upcall != NULL);
385 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
386 obd->obd_upcall.onu_owner);
389 OBD_FREE(ld, sizeof(*ld));
393 /* Inform MDS about new/updated target */
394 static int mds_lov_update_mds(struct obd_device *obd,
395 struct obd_device *watched,
398 struct mds_obd *mds = &obd->u.mds;
406 /* Don't let anyone else mess with mds_lov_objids now */
407 mutex_down(&obd->obd_dev_sem);
409 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
413 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
414 idx, obd->obd_recovering, obd->obd_async_recov,
415 mds->mds_lov_desc.ld_tgt_count);
417 /* idx is set as data from lov_notify. */
418 if (obd->obd_recovering)
421 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
422 CERROR("index %d > count %d!\n", idx,
423 mds->mds_lov_desc.ld_tgt_count);
424 GOTO(out, rc = -EINVAL);
427 rc = mds_lov_get_objid(obd, idx);
431 page = idx / OBJID_PER_PAGE();
432 off = idx % OBJID_PER_PAGE();
433 data = mds->mds_lov_page_array[page];
435 /* We have read this lastid from disk; tell the osc.
436 Don't call this during recovery. */
437 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
439 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
440 /* Don't abort the rest of the sync */
443 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
447 mutex_up(&obd->obd_dev_sem);
451 /* update the LOV-OSC knowledge of the last used object id's */
452 int mds_lov_connect(struct obd_device *obd, char * lov_name)
454 struct mds_obd *mds = &obd->u.mds;
455 struct lustre_handle conn = {0,};
456 struct obd_connect_data *data;
460 if (IS_ERR(mds->mds_osc_obd))
461 RETURN(PTR_ERR(mds->mds_osc_obd));
463 if (mds->mds_osc_obd)
466 mds->mds_osc_obd = class_name2obd(lov_name);
467 if (!mds->mds_osc_obd) {
468 CERROR("MDS cannot locate LOV %s\n", lov_name);
469 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
473 OBD_ALLOC(data, sizeof(*data));
476 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
477 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
478 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
480 #ifdef HAVE_LRU_RESIZE_SUPPORT
481 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
483 data->ocd_version = LUSTRE_VERSION_CODE;
484 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
485 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
486 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
487 OBD_FREE(data, sizeof(*data));
489 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
490 mds->mds_osc_obd = ERR_PTR(rc);
493 mds->mds_osc_exp = class_conn2export(&conn);
495 rc = obd_register_observer(mds->mds_osc_obd, obd);
497 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
499 GOTO(err_discon, rc);
502 /* Deny new client connections until we are sure we have some OSTs */
503 obd->obd_no_conn = 1;
505 mutex_down(&obd->obd_dev_sem);
506 rc = mds_lov_read_objids(obd);
508 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
512 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
516 /* tgt_count may be 0! */
517 rc = llog_cat_initialize(obd, &obd->obd_olg,
518 mds->mds_lov_desc.ld_tgt_count, NULL);
520 CERROR("failed to initialize catalog %d\n", rc);
524 /* If we're mounting this code for the first time on an existing FS,
525 * we need to populate the objids array from the real OST values */
526 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
527 __u32 i = mds->mds_lov_objid_count;
528 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
529 rc = mds_lov_get_objid(obd, i);
534 rc = mds_lov_write_objids(obd);
536 CERROR("got last objids from OSTs, but error "
537 "in update objids file: %d\n", rc);
539 mutex_up(&obd->obd_dev_sem);
541 /* I want to see a callback happen when the OBD moves to a
542 * "For General Use" state, and that's when we'll call
543 * set_nextid(). The class driver can help us here, because
544 * it can use the obd_recovering flag to determine when the
545 * the OBD is full available. */
546 /* MDD device will care about that
547 if (!obd->obd_recovering)
548 rc = mds_postrecov(obd);
553 mutex_up(&obd->obd_dev_sem);
554 obd_register_observer(mds->mds_osc_obd, NULL);
556 obd_disconnect(mds->mds_osc_exp);
557 mds->mds_osc_exp = NULL;
558 mds->mds_osc_obd = ERR_PTR(rc);
562 int mds_lov_disconnect(struct obd_device *obd)
564 struct mds_obd *mds = &obd->u.mds;
568 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
569 obd_register_observer(mds->mds_osc_obd, NULL);
571 /* The actual disconnect of the mds_lov will be called from
572 * class_disconnect_exports from mds_lov_clean. So we have to
573 * ensure that class_cleanup doesn't fail due to the extra ref
574 * we're holding now. The mechanism to do that already exists -
575 * the obd_force flag. We'll drop the final ref to the
576 * mds_osc_exp in mds_cleanup. */
577 mds->mds_osc_obd->obd_force = 1;
583 /* Collect the preconditions we need to allow client connects */
584 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
586 if (flag & CONFIG_LOG)
587 obd->u.mds.mds_fl_cfglog = 1;
588 if (flag & CONFIG_SYNC)
589 obd->u.mds.mds_fl_synced = 1;
590 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
591 /* Open for clients */
592 obd->obd_no_conn = 0;
595 struct mds_lov_sync_info {
596 struct obd_device *mlsi_obd; /* the lov device to sync */
597 struct obd_device *mlsi_watched; /* target osc */
598 __u32 mlsi_index; /* index of target */
601 static int mds_propagate_capa_keys(struct mds_obd *mds)
603 struct lustre_capa_key *key;
608 if (!mds->mds_capa_keys)
611 for (i = 0; i < 2; i++) {
612 key = &mds->mds_capa_keys[i];
613 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
615 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
616 KEY_CAPA_KEY, sizeof(*key), key, NULL);
618 DEBUG_CAPA_KEY(D_ERROR, key,
619 "propagate failed (rc = %d) for", rc);
627 /* We only sync one osc at a time, so that we don't have to hold
628 any kind of lock on the whole mds_lov_desc, which may change
629 (grow) as a result of mds_lov_add_ost. This also avoids any
630 kind of mismatch between the lov_desc and the mds_lov_desc,
631 which are not in lock-step during lov_add_obd */
632 static int __mds_lov_synchronize(void *data)
634 struct mds_lov_sync_info *mlsi = data;
635 struct obd_device *obd = mlsi->mlsi_obd;
636 struct obd_device *watched = mlsi->mlsi_watched;
637 struct mds_obd *mds = &obd->u.mds;
638 struct obd_uuid *uuid;
639 __u32 idx = mlsi->mlsi_index;
640 struct mds_group_info mgi;
641 struct llog_ctxt *ctxt;
649 uuid = &watched->u.cli.cl_target_uuid;
652 down_read(&mds->mds_notify_lock);
653 if (obd->obd_stopping || obd->obd_fail)
654 GOTO(out, rc = -ENODEV);
656 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
657 rc = mds_lov_update_mds(obd, watched, idx);
659 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
662 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
665 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
666 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
669 /* propagate capability keys */
670 rc = mds_propagate_capa_keys(mds);
674 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
676 GOTO(out, rc = -ENODEV);
678 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
679 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
683 CERROR("%s failed at llog_origin_connect: %d\n",
684 obd_uuid2str(uuid), rc);
688 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
689 obd->obd_name, obd_uuid2str(uuid));
690 rc = mds_lov_clear_orphans(mds, uuid);
692 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
693 obd_uuid2str(uuid), rc);
697 if (obd->obd_upcall.onu_owner) {
699 * This is a hack for mds_notify->mdd_notify. When the mds obd
700 * in mdd is removed, This hack should be removed.
702 LASSERT(obd->obd_upcall.onu_upcall != NULL);
703 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
704 obd->obd_upcall.onu_owner);
708 up_read(&mds->mds_notify_lock);
710 /* Deactivate it for safety */
711 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
713 if (!obd->obd_stopping && mds->mds_osc_obd &&
714 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
715 obd_notify(mds->mds_osc_obd, watched,
716 OBD_NOTIFY_INACTIVE, NULL);
723 int mds_lov_synchronize(void *data)
725 struct mds_lov_sync_info *mlsi = data;
728 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
729 ptlrpc_daemonize(name);
731 RETURN(__mds_lov_synchronize(data));
734 int mds_lov_start_synchronize(struct obd_device *obd,
735 struct obd_device *watched,
736 void *data, int nonblock)
738 struct mds_lov_sync_info *mlsi;
740 struct obd_uuid *uuid;
744 uuid = &watched->u.cli.cl_target_uuid;
746 OBD_ALLOC(mlsi, sizeof(*mlsi));
750 mlsi->mlsi_obd = obd;
751 mlsi->mlsi_watched = watched;
752 mlsi->mlsi_index = *(__u32 *)data;
754 /* Although class_export_get(obd->obd_self_export) would lock
755 the MDS in place, since it's only a self-export
756 it doesn't lock the LOV in place. The LOV can be disconnected
757 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
758 Simply taking an export ref on the LOV doesn't help, because it's
759 still disconnected. Taking an obd reference insures that we don't
760 disconnect the LOV. This of course means a cleanup won't
761 finish for as long as the sync is blocking. */
765 /* Synchronize in the background */
766 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
767 CLONE_VM | CLONE_FILES);
769 CERROR("%s: error starting mds_lov_synchronize: %d\n",
773 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
774 "thread=%d\n", obd->obd_name,
775 mlsi->mlsi_index, rc);
779 rc = __mds_lov_synchronize((void *)mlsi);
785 int mds_notify(struct obd_device *obd, struct obd_device *watched,
786 enum obd_notify_event ev, void *data)
792 /* We only handle these: */
793 case OBD_NOTIFY_ACTIVE:
794 case OBD_NOTIFY_SYNC:
795 case OBD_NOTIFY_SYNC_NONBLOCK:
797 case OBD_NOTIFY_CONFIG:
798 mds_allow_cli(obd, (unsigned long)data);
803 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
804 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
805 CERROR("unexpected notification of %s %s!\n",
806 watched->obd_type->typ_name, watched->obd_name);
810 if (obd->obd_recovering) {
811 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
813 obd_uuid2str(&watched->u.cli.cl_target_uuid));
814 /* We still have to fix the lov descriptor for ost's added
815 after the mdt in the config log. They didn't make it into
817 mutex_down(&obd->obd_dev_sem);
818 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
820 mutex_up(&obd->obd_dev_sem);
823 /* We should update init llog here too for replay unlink and
824 * possiable llog init race when recovery complete */
825 llog_cat_initialize(obd, &obd->obd_olg,
826 obd->u.mds.mds_lov_desc.ld_tgt_count,
827 &watched->u.cli.cl_target_uuid);
828 mutex_up(&obd->obd_dev_sem);
829 mds_allow_cli(obd, CONFIG_SYNC);
833 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
834 rc = mds_lov_start_synchronize(obd, watched, data,
835 !(ev == OBD_NOTIFY_SYNC));
837 lquota_recovery(mds_quota_interface_ref, obd);