1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 #define DEBUG_SUBSYSTEM S_MDS
31 #include <linux/module.h>
32 #include <lustre_mds.h>
33 #include <lustre/lustre_idl.h>
34 #include <obd_class.h>
36 #include <lustre_lib.h>
37 #include <lustre_fsfilt.h>
39 #include "mds_internal.h"
41 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
43 struct mds_obd *mds = &obd->u.mds;
46 CDEBUG(D_INFO, "dump from %s\n", label);
47 if (mds->mds_lov_page_dirty == NULL) {
48 CERROR("NULL bitmap!\n");
52 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
53 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
55 if (mds->mds_lov_page_array == NULL) {
56 CERROR("not init page array!\n");
60 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
61 obd_id *data = mds->mds_lov_page_array[i];
66 for(j=0; j < OBJID_PER_PAGE(); j++) {
69 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
76 int mds_lov_init_objids(struct obd_device *obd)
78 struct mds_obd *mds = &obd->u.mds;
79 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
84 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
86 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
87 if (mds->mds_lov_page_dirty == NULL)
91 OBD_ALLOC(mds->mds_lov_page_array, size);
92 if (mds->mds_lov_page_array == NULL)
93 GOTO(err_free_bitmap, rc = -ENOMEM);
95 /* open and test the lov objd file */
96 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
99 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
100 GOTO(err_free, rc = PTR_ERR(file));
102 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
103 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
104 file->f_dentry->d_inode->i_mode);
105 GOTO(err_open, rc = -ENOENT);
107 mds->mds_lov_objid_filp = file;
111 if (filp_close((struct file *)file, 0))
112 CERROR("can't close %s after error\n", LOV_OBJID);
114 OBD_FREE(mds->mds_lov_page_array, size);
116 FREE_BITMAP(mds->mds_lov_page_dirty);
121 void mds_lov_destroy_objids(struct obd_device *obd)
123 struct mds_obd *mds = &obd->u.mds;
127 if (mds->mds_lov_page_array != NULL) {
128 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
129 obd_id *data = mds->mds_lov_page_array[i];
131 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
133 OBD_FREE(mds->mds_lov_page_array,
134 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
137 if (mds->mds_lov_objid_filp) {
138 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
139 mds->mds_lov_objid_filp = NULL;
141 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
144 FREE_BITMAP(mds->mds_lov_page_dirty);
148 static int mds_lov_read_objids(struct obd_device *obd)
150 struct mds_obd *mds = &obd->u.mds;
152 int i, rc, count = 0, page = 0;
156 /* Read everything in the file, even if our current lov desc
157 has fewer targets. Old targets not in the lov descriptor
158 during mds setup may still have valid objids. */
159 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
163 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
164 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
166 for (i = 0; i < page; i++) {
167 obd_id *data = mds->mds_lov_page_array[i];
168 loff_t off_old = off;
170 LASSERT(data == NULL);
171 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
173 GOTO(out, rc = -ENOMEM);
175 mds->mds_lov_page_array[i] = data;
177 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
178 OBJID_PER_PAGE()*sizeof(obd_id), &off);
180 CERROR("Error reading objids %d\n", rc);
186 count += (off - off_old)/sizeof(obd_id);
188 mds->mds_lov_objid_count = count;
191 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
192 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
194 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
195 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
197 mds_lov_dump_objids("read",obd);
202 int mds_lov_write_objids(struct obd_device *obd)
204 struct mds_obd *mds = &obd->u.mds;
208 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
211 mds_lov_dump_objids("write", obd);
213 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
214 obd_id *data = mds->mds_lov_page_array[i];
215 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
216 loff_t off = i * size;
218 LASSERT(data != NULL);
220 /* check for particaly filled last page */
221 if (i == mds->mds_lov_objid_lastpage)
222 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
224 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
228 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
235 EXPORT_SYMBOL(mds_lov_write_objids);
237 static int mds_lov_get_objid(struct obd_device * obd,
240 struct mds_obd *mds = &obd->u.mds;
247 page = idx / OBJID_PER_PAGE();
248 off = idx % OBJID_PER_PAGE();
249 data = mds->mds_lov_page_array[page];
251 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
253 GOTO(out, rc = -ENOMEM);
255 mds->mds_lov_page_array[page] = data;
258 if (data[off] == 0) {
259 /* We never read this lastid; ask the osc */
260 struct obd_id_info lastid;
261 __u32 size = sizeof(lastid);
264 lastid.data = &data[off];
265 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
266 KEY_LAST_ID, &size, &lastid);
270 if (idx > mds->mds_lov_objid_count) {
271 mds->mds_lov_objid_count = idx;
272 mds->mds_lov_objid_lastpage = page;
273 mds->mds_lov_objid_lastidx = off;
275 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
281 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
285 struct obd_trans_info oti = {0};
286 struct lov_stripe_md *empty_ea = NULL;
289 LASSERT(mds->mds_lov_page_array != NULL);
291 /* This create will in fact either create or destroy: If the OST is
292 * missing objects below this ID, they will be created. If it finds
293 * objects above this ID, they will be removed. */
294 memset(&oa, 0, sizeof(oa));
295 oa.o_flags = OBD_FL_DELORPHAN;
296 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
297 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
298 if (ost_uuid != NULL) {
299 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
300 oa.o_valid |= OBD_MD_FLINLINE;
302 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
308 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
310 struct mds_obd *mds = &obd->u.mds;
312 struct obd_id_info info;
315 LASSERT(!obd->obd_recovering);
317 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
318 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
322 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
323 KEY_NEXT_ID, sizeof(info), &info, NULL);
325 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
331 static __u32 mds_lov_get_idx(struct obd_export *lov,
332 struct obd_uuid *ost_uuid)
335 int valsize = sizeof(ost_uuid);
337 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
344 /* Update the lov desc for a new size lov. */
345 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
347 struct mds_obd *mds = &obd->u.mds;
349 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
353 OBD_ALLOC(ld, sizeof(*ld));
357 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
362 /* Don't change the mds_lov_desc until the objids size matches the
364 mds->mds_lov_desc = *ld;
365 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
366 mds->mds_lov_desc.ld_tgt_count);
368 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369 mds->mds_lov_desc.ld_tgt_count);
371 mds->mds_max_mdsize = lov_mds_md_size(stripes);
372 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
374 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
377 /* If we added a target we have to reconnect the llogs */
378 /* We only _need_ to do this at first add (idx), or the first time
379 after recovery. However, it should now be safe to call anytime. */
380 rc = llog_cat_initialize(obd, &obd->obd_olg,
381 mds->mds_lov_desc.ld_tgt_count, NULL);
383 /*XXX this notifies the MDD until lov handling use old mds code */
384 if (obd->obd_upcall.onu_owner) {
385 LASSERT(obd->obd_upcall.onu_upcall != NULL);
386 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
387 obd->obd_upcall.onu_owner);
390 OBD_FREE(ld, sizeof(*ld));
395 #define MDSLOV_NO_INDEX -1
397 /* Inform MDS about new/updated target */
398 static int mds_lov_update_mds(struct obd_device *obd,
399 struct obd_device *watched,
402 struct mds_obd *mds = &obd->u.mds;
410 /* Don't let anyone else mess with mds_lov_objids now */
411 mutex_down(&obd->obd_dev_sem);
413 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
417 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
418 idx, obd->obd_recovering, obd->obd_async_recov,
419 mds->mds_lov_desc.ld_tgt_count);
421 /* idx is set as data from lov_notify. */
422 if (obd->obd_recovering)
425 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
426 CERROR("index %d > count %d!\n", idx,
427 mds->mds_lov_desc.ld_tgt_count);
428 GOTO(out, rc = -EINVAL);
431 rc = mds_lov_get_objid(obd, idx);
435 page = idx / OBJID_PER_PAGE();
436 off = idx % OBJID_PER_PAGE();
437 data = mds->mds_lov_page_array[page];
439 /* We have read this lastid from disk; tell the osc.
440 Don't call this during recovery. */
441 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
443 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
444 /* Don't abort the rest of the sync */
447 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
451 mutex_up(&obd->obd_dev_sem);
455 /* update the LOV-OSC knowledge of the last used object id's */
456 int mds_lov_connect(struct obd_device *obd, char * lov_name)
458 struct mds_obd *mds = &obd->u.mds;
459 struct lustre_handle conn = {0,};
460 struct obd_connect_data *data;
464 if (IS_ERR(mds->mds_osc_obd))
465 RETURN(PTR_ERR(mds->mds_osc_obd));
467 if (mds->mds_osc_obd)
470 mds->mds_osc_obd = class_name2obd(lov_name);
471 if (!mds->mds_osc_obd) {
472 CERROR("MDS cannot locate LOV %s\n", lov_name);
473 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
477 OBD_ALLOC(data, sizeof(*data));
480 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
481 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
482 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
484 #ifdef HAVE_LRU_RESIZE_SUPPORT
485 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
487 data->ocd_version = LUSTRE_VERSION_CODE;
488 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
489 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
490 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
491 OBD_FREE(data, sizeof(*data));
493 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
494 mds->mds_osc_obd = ERR_PTR(rc);
497 mds->mds_osc_exp = class_conn2export(&conn);
499 rc = obd_register_observer(mds->mds_osc_obd, obd);
501 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
503 GOTO(err_discon, rc);
506 /* Deny new client connections until we are sure we have some OSTs */
507 obd->obd_no_conn = 1;
509 mutex_down(&obd->obd_dev_sem);
510 rc = mds_lov_read_objids(obd);
512 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
516 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
520 /* tgt_count may be 0! */
521 rc = llog_cat_initialize(obd, &obd->obd_olg,
522 mds->mds_lov_desc.ld_tgt_count, NULL);
524 CERROR("failed to initialize catalog %d\n", rc);
528 /* If we're mounting this code for the first time on an existing FS,
529 * we need to populate the objids array from the real OST values */
530 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
531 __u32 i = mds->mds_lov_objid_count;
532 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
533 rc = mds_lov_get_objid(obd, i);
538 rc = mds_lov_write_objids(obd);
540 CERROR("got last objids from OSTs, but error "
541 "in update objids file: %d\n", rc);
543 mutex_up(&obd->obd_dev_sem);
545 /* I want to see a callback happen when the OBD moves to a
546 * "For General Use" state, and that's when we'll call
547 * set_nextid(). The class driver can help us here, because
548 * it can use the obd_recovering flag to determine when the
549 * the OBD is full available. */
550 /* MDD device will care about that
551 if (!obd->obd_recovering)
552 rc = mds_postrecov(obd);
557 mutex_up(&obd->obd_dev_sem);
558 obd_register_observer(mds->mds_osc_obd, NULL);
560 obd_disconnect(mds->mds_osc_exp);
561 mds->mds_osc_exp = NULL;
562 mds->mds_osc_obd = ERR_PTR(rc);
566 int mds_lov_disconnect(struct obd_device *obd)
568 struct mds_obd *mds = &obd->u.mds;
572 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
573 obd_register_observer(mds->mds_osc_obd, NULL);
575 /* The actual disconnect of the mds_lov will be called from
576 * class_disconnect_exports from mds_lov_clean. So we have to
577 * ensure that class_cleanup doesn't fail due to the extra ref
578 * we're holding now. The mechanism to do that already exists -
579 * the obd_force flag. We'll drop the final ref to the
580 * mds_osc_exp in mds_cleanup. */
581 mds->mds_osc_obd->obd_force = 1;
587 /* Collect the preconditions we need to allow client connects */
588 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
590 if (flag & CONFIG_LOG)
591 obd->u.mds.mds_fl_cfglog = 1;
592 if (flag & CONFIG_SYNC)
593 obd->u.mds.mds_fl_synced = 1;
594 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
595 /* Open for clients */
596 obd->obd_no_conn = 0;
599 struct mds_lov_sync_info {
600 struct obd_device *mlsi_obd; /* the lov device to sync */
601 struct obd_device *mlsi_watched; /* target osc */
602 __u32 mlsi_index; /* index of target */
605 static int mds_propagate_capa_keys(struct mds_obd *mds)
607 struct lustre_capa_key *key;
612 if (!mds->mds_capa_keys)
615 for (i = 0; i < 2; i++) {
616 key = &mds->mds_capa_keys[i];
617 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
619 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
620 KEY_CAPA_KEY, sizeof(*key), key, NULL);
622 DEBUG_CAPA_KEY(D_ERROR, key,
623 "propagate failed (rc = %d) for", rc);
631 /* We only sync one osc at a time, so that we don't have to hold
632 any kind of lock on the whole mds_lov_desc, which may change
633 (grow) as a result of mds_lov_add_ost. This also avoids any
634 kind of mismatch between the lov_desc and the mds_lov_desc,
635 which are not in lock-step during lov_add_obd */
636 static int __mds_lov_synchronize(void *data)
638 struct mds_lov_sync_info *mlsi = data;
639 struct obd_device *obd = mlsi->mlsi_obd;
640 struct obd_device *watched = mlsi->mlsi_watched;
641 struct mds_obd *mds = &obd->u.mds;
642 struct obd_uuid *uuid;
643 __u32 idx = mlsi->mlsi_index;
644 struct mds_group_info mgi;
645 struct llog_ctxt *ctxt;
653 uuid = &watched->u.cli.cl_target_uuid;
656 down_read(&mds->mds_notify_lock);
657 if (obd->obd_stopping || obd->obd_fail)
658 GOTO(out, rc = -ENODEV);
660 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
661 rc = mds_lov_update_mds(obd, watched, idx);
663 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
666 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
669 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
670 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
673 /* propagate capability keys */
674 rc = mds_propagate_capa_keys(mds);
678 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
680 GOTO(out, rc = -ENODEV);
682 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
683 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
687 CERROR("%s failed at llog_origin_connect: %d\n",
688 obd_uuid2str(uuid), rc);
692 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
693 obd->obd_name, obd_uuid2str(uuid));
694 rc = mds_lov_clear_orphans(mds, uuid);
696 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
697 obd_uuid2str(uuid), rc);
701 if (obd->obd_upcall.onu_owner) {
703 * This is a hack for mds_notify->mdd_notify. When the mds obd
704 * in mdd is removed, This hack should be removed.
706 LASSERT(obd->obd_upcall.onu_upcall != NULL);
707 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
708 obd->obd_upcall.onu_owner);
712 up_read(&mds->mds_notify_lock);
714 /* Deactivate it for safety */
715 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
717 if (!obd->obd_stopping && mds->mds_osc_obd &&
718 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
719 obd_notify(mds->mds_osc_obd, watched,
720 OBD_NOTIFY_INACTIVE, NULL);
727 int mds_lov_synchronize(void *data)
729 struct mds_lov_sync_info *mlsi = data;
732 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
733 ptlrpc_daemonize(name);
735 RETURN(__mds_lov_synchronize(data));
738 int mds_lov_start_synchronize(struct obd_device *obd,
739 struct obd_device *watched,
740 void *data, int nonblock)
742 struct mds_lov_sync_info *mlsi;
744 struct mds_obd *mds = &obd->u.mds;
745 struct obd_uuid *uuid;
749 uuid = &watched->u.cli.cl_target_uuid;
751 OBD_ALLOC(mlsi, sizeof(*mlsi));
755 mlsi->mlsi_obd = obd;
756 mlsi->mlsi_watched = watched;
758 mlsi->mlsi_index = *(__u32 *)data;
760 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
762 /* Although class_export_get(obd->obd_self_export) would lock
763 the MDS in place, since it's only a self-export
764 it doesn't lock the LOV in place. The LOV can be disconnected
765 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
766 Simply taking an export ref on the LOV doesn't help, because it's
767 still disconnected. Taking an obd reference insures that we don't
768 disconnect the LOV. This of course means a cleanup won't
769 finish for as long as the sync is blocking. */
773 /* Synchronize in the background */
774 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
775 CLONE_VM | CLONE_FILES);
777 CERROR("%s: error starting mds_lov_synchronize: %d\n",
781 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
782 "thread=%d\n", obd->obd_name,
783 mlsi->mlsi_index, rc);
787 rc = __mds_lov_synchronize((void *)mlsi);
793 int mds_notify(struct obd_device *obd, struct obd_device *watched,
794 enum obd_notify_event ev, void *data)
800 /* We only handle these: */
801 case OBD_NOTIFY_ACTIVE:
802 case OBD_NOTIFY_SYNC:
803 case OBD_NOTIFY_SYNC_NONBLOCK:
805 case OBD_NOTIFY_CONFIG:
806 mds_allow_cli(obd, (unsigned long)data);
811 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
812 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
813 CERROR("unexpected notification of %s %s!\n",
814 watched->obd_type->typ_name, watched->obd_name);
818 if (obd->obd_recovering) {
819 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
821 obd_uuid2str(&watched->u.cli.cl_target_uuid));
822 /* We still have to fix the lov descriptor for ost's added
823 after the mdt in the config log. They didn't make it into
825 mutex_down(&obd->obd_dev_sem);
826 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
828 mutex_up(&obd->obd_dev_sem);
831 /* We should update init llog here too for replay unlink and
832 * possiable llog init race when recovery complete */
833 llog_cat_initialize(obd, &obd->obd_olg,
834 obd->u.mds.mds_lov_desc.ld_tgt_count,
835 &watched->u.cli.cl_target_uuid);
836 mutex_up(&obd->obd_dev_sem);
837 mds_allow_cli(obd, CONFIG_SYNC);
841 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
842 rc = mds_lov_start_synchronize(obd, watched, data,
843 !(ev == OBD_NOTIFY_SYNC));
845 lquota_recovery(mds_quota_interface_ref, obd);