1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
53 #include "mds_internal.h"
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 struct mds_obd *mds = &obd->u.mds;
60 CDEBUG(D_INFO, "dump from %s\n", label);
61 if (mds->mds_lov_page_dirty == NULL) {
62 CERROR("NULL bitmap!\n");
66 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 if (mds->mds_lov_page_array == NULL) {
70 CERROR("not init page array!\n");
74 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75 obd_id *data = mds->mds_lov_page_array[i];
80 for(j=0; j < OBJID_PER_PAGE(); j++) {
83 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
90 int mds_lov_init_objids(struct obd_device *obd)
92 struct mds_obd *mds = &obd->u.mds;
93 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
98 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101 if (mds->mds_lov_page_dirty == NULL)
105 OBD_ALLOC(mds->mds_lov_page_array, size);
106 if (mds->mds_lov_page_array == NULL)
107 GOTO(err_free_bitmap, rc = -ENOMEM);
109 /* open and test the lov objd file */
110 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
113 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114 GOTO(err_free, rc = PTR_ERR(file));
116 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118 file->f_dentry->d_inode->i_mode);
119 GOTO(err_open, rc = -ENOENT);
121 mds->mds_lov_objid_filp = file;
125 if (filp_close((struct file *)file, 0))
126 CERROR("can't close %s after error\n", LOV_OBJID);
128 OBD_FREE(mds->mds_lov_page_array, size);
130 FREE_BITMAP(mds->mds_lov_page_dirty);
135 void mds_lov_destroy_objids(struct obd_device *obd)
137 struct mds_obd *mds = &obd->u.mds;
141 if (mds->mds_lov_page_array != NULL) {
142 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143 obd_id *data = mds->mds_lov_page_array[i];
145 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147 OBD_FREE(mds->mds_lov_page_array,
148 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
151 if (mds->mds_lov_objid_filp) {
152 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153 mds->mds_lov_objid_filp = NULL;
155 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
158 FREE_BITMAP(mds->mds_lov_page_dirty);
162 static int mds_lov_read_objids(struct obd_device *obd)
164 struct mds_obd *mds = &obd->u.mds;
166 int i, rc, count = 0, page = 0;
170 /* Read everything in the file, even if our current lov desc
171 has fewer targets. Old targets not in the lov descriptor
172 during mds setup may still have valid objids. */
173 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
177 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
180 for (i = 0; i < page; i++) {
181 obd_id *data = mds->mds_lov_page_array[i];
182 loff_t off_old = off;
184 LASSERT(data == NULL);
185 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
187 GOTO(out, rc = -ENOMEM);
189 mds->mds_lov_page_array[i] = data;
191 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192 OBJID_PER_PAGE()*sizeof(obd_id), &off);
194 CERROR("Error reading objids %d\n", rc);
200 count += (off - off_old)/sizeof(obd_id);
202 mds->mds_lov_objid_count = count;
205 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
208 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
211 mds_lov_dump_objids("read",obd);
216 int mds_lov_write_objids(struct obd_device *obd)
218 struct mds_obd *mds = &obd->u.mds;
222 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
225 mds_lov_dump_objids("write", obd);
227 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228 obd_id *data = mds->mds_lov_page_array[i];
229 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230 loff_t off = i * size;
232 LASSERT(data != NULL);
234 /* check for particaly filled last page */
235 if (i == mds->mds_lov_objid_lastpage)
236 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
238 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
242 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
249 EXPORT_SYMBOL(mds_lov_write_objids);
251 static int mds_lov_get_objid(struct obd_device * obd,
254 struct mds_obd *mds = &obd->u.mds;
261 page = idx / OBJID_PER_PAGE();
262 off = idx % OBJID_PER_PAGE();
263 data = mds->mds_lov_page_array[page];
265 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
267 GOTO(out, rc = -ENOMEM);
269 mds->mds_lov_page_array[page] = data;
272 if (data[off] == 0) {
273 /* We never read this lastid; ask the osc */
274 struct obd_id_info lastid;
275 __u32 size = sizeof(lastid);
278 lastid.data = &data[off];
279 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280 KEY_LAST_ID, &size, &lastid, NULL);
284 if (idx > mds->mds_lov_objid_count) {
285 mds->mds_lov_objid_count = idx;
286 mds->mds_lov_objid_lastpage = page;
287 mds->mds_lov_objid_lastidx = off;
289 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
299 struct obd_trans_info oti = {0};
300 struct lov_stripe_md *empty_ea = NULL;
303 LASSERT(mds->mds_lov_page_array != NULL);
305 /* This create will in fact either create or destroy: If the OST is
306 * missing objects below this ID, they will be created. If it finds
307 * objects above this ID, they will be removed. */
308 memset(&oa, 0, sizeof(oa));
309 oa.o_flags = OBD_FL_DELORPHAN;
310 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312 if (ost_uuid != NULL)
313 oti.oti_ost_uuid = ost_uuid;
314 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
322 struct mds_obd *mds = &obd->u.mds;
324 struct obd_id_info info;
327 LASSERT(!obd->obd_recovering);
329 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
334 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335 KEY_NEXT_ID, sizeof(info), &info, NULL);
337 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
343 static __u32 mds_lov_get_idx(struct obd_export *lov,
344 struct obd_uuid *ost_uuid)
347 int valsize = sizeof(ost_uuid);
349 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
350 &valsize, ost_uuid, NULL);
356 /* Update the lov desc for a new size lov. */
357 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
359 struct mds_obd *mds = &obd->u.mds;
361 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
365 OBD_ALLOC(ld, sizeof(*ld));
369 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
374 /* Don't change the mds_lov_desc until the objids size matches the
376 mds->mds_lov_desc = *ld;
377 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
378 mds->mds_lov_desc.ld_tgt_count);
380 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
381 mds->mds_lov_desc.ld_tgt_count);
383 mds->mds_max_mdsize = lov_mds_md_size(stripes);
384 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
385 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
386 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
389 /* If we added a target we have to reconnect the llogs */
390 /* We only _need_ to do this at first add (idx), or the first time
391 after recovery. However, it should now be safe to call anytime. */
392 rc = llog_cat_initialize(obd, &obd->obd_olg,
393 mds->mds_lov_desc.ld_tgt_count, NULL);
395 /*XXX this notifies the MDD until lov handling use old mds code */
396 if (obd->obd_upcall.onu_owner) {
397 LASSERT(obd->obd_upcall.onu_upcall != NULL);
398 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
399 obd->obd_upcall.onu_owner);
402 OBD_FREE(ld, sizeof(*ld));
407 #define MDSLOV_NO_INDEX -1
409 /* Inform MDS about new/updated target */
410 static int mds_lov_update_mds(struct obd_device *obd,
411 struct obd_device *watched,
414 struct mds_obd *mds = &obd->u.mds;
422 /* Don't let anyone else mess with mds_lov_objids now */
423 mutex_down(&obd->obd_dev_sem);
425 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
429 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
430 idx, obd->obd_recovering, obd->obd_async_recov,
431 mds->mds_lov_desc.ld_tgt_count);
433 /* idx is set as data from lov_notify. */
434 if (obd->obd_recovering)
437 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
438 CERROR("index %d > count %d!\n", idx,
439 mds->mds_lov_desc.ld_tgt_count);
440 GOTO(out, rc = -EINVAL);
443 rc = mds_lov_get_objid(obd, idx);
447 page = idx / OBJID_PER_PAGE();
448 off = idx % OBJID_PER_PAGE();
449 data = mds->mds_lov_page_array[page];
451 /* We have read this lastid from disk; tell the osc.
452 Don't call this during recovery. */
453 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
455 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
456 /* Don't abort the rest of the sync */
459 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
463 mutex_up(&obd->obd_dev_sem);
467 /* update the LOV-OSC knowledge of the last used object id's */
468 int mds_lov_connect(struct obd_device *obd, char * lov_name)
470 struct mds_obd *mds = &obd->u.mds;
471 struct lustre_handle conn = {0,};
472 struct obd_connect_data *data;
476 if (IS_ERR(mds->mds_osc_obd))
477 RETURN(PTR_ERR(mds->mds_osc_obd));
479 if (mds->mds_osc_obd)
482 mds->mds_osc_obd = class_name2obd(lov_name);
483 if (!mds->mds_osc_obd) {
484 CERROR("MDS cannot locate LOV %s\n", lov_name);
485 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
489 OBD_ALLOC(data, sizeof(*data));
492 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
493 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
494 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
496 #ifdef HAVE_LRU_RESIZE_SUPPORT
497 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
499 data->ocd_version = LUSTRE_VERSION_CODE;
500 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
501 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
502 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
503 OBD_FREE(data, sizeof(*data));
505 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
506 mds->mds_osc_obd = ERR_PTR(rc);
509 mds->mds_osc_exp = class_conn2export(&conn);
511 rc = obd_register_observer(mds->mds_osc_obd, obd);
513 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
515 GOTO(err_discon, rc);
518 /* Deny new client connections until we are sure we have some OSTs */
519 obd->obd_no_conn = 1;
521 mutex_down(&obd->obd_dev_sem);
522 rc = mds_lov_read_objids(obd);
524 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
528 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
532 /* tgt_count may be 0! */
533 rc = llog_cat_initialize(obd, &obd->obd_olg,
534 mds->mds_lov_desc.ld_tgt_count, NULL);
536 CERROR("failed to initialize catalog %d\n", rc);
540 /* If we're mounting this code for the first time on an existing FS,
541 * we need to populate the objids array from the real OST values */
542 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
543 __u32 i = mds->mds_lov_objid_count;
544 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
545 rc = mds_lov_get_objid(obd, i);
550 rc = mds_lov_write_objids(obd);
552 CERROR("got last objids from OSTs, but error "
553 "in update objids file: %d\n", rc);
555 mutex_up(&obd->obd_dev_sem);
557 /* I want to see a callback happen when the OBD moves to a
558 * "For General Use" state, and that's when we'll call
559 * set_nextid(). The class driver can help us here, because
560 * it can use the obd_recovering flag to determine when the
561 * the OBD is full available. */
562 /* MDD device will care about that
563 if (!obd->obd_recovering)
564 rc = mds_postrecov(obd);
569 mutex_up(&obd->obd_dev_sem);
570 obd_register_observer(mds->mds_osc_obd, NULL);
572 obd_disconnect(mds->mds_osc_exp);
573 mds->mds_osc_exp = NULL;
574 mds->mds_osc_obd = ERR_PTR(rc);
578 int mds_lov_disconnect(struct obd_device *obd)
580 struct mds_obd *mds = &obd->u.mds;
584 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
585 obd_register_observer(mds->mds_osc_obd, NULL);
587 /* The actual disconnect of the mds_lov will be called from
588 * class_disconnect_exports from mds_lov_clean. So we have to
589 * ensure that class_cleanup doesn't fail due to the extra ref
590 * we're holding now. The mechanism to do that already exists -
591 * the obd_force flag. We'll drop the final ref to the
592 * mds_osc_exp in mds_cleanup. */
593 mds->mds_osc_obd->obd_force = 1;
599 /* Collect the preconditions we need to allow client connects */
600 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
602 if (flag & CONFIG_LOG)
603 obd->u.mds.mds_fl_cfglog = 1;
604 if (flag & CONFIG_SYNC)
605 obd->u.mds.mds_fl_synced = 1;
606 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
607 /* Open for clients */
608 obd->obd_no_conn = 0;
611 struct mds_lov_sync_info {
612 struct obd_device *mlsi_obd; /* the lov device to sync */
613 struct obd_device *mlsi_watched; /* target osc */
614 __u32 mlsi_index; /* index of target */
617 static int mds_propagate_capa_keys(struct mds_obd *mds)
619 struct lustre_capa_key *key;
624 if (!mds->mds_capa_keys)
627 for (i = 0; i < 2; i++) {
628 key = &mds->mds_capa_keys[i];
629 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
631 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
632 KEY_CAPA_KEY, sizeof(*key), key, NULL);
634 DEBUG_CAPA_KEY(D_ERROR, key,
635 "propagate failed (rc = %d) for", rc);
643 /* We only sync one osc at a time, so that we don't have to hold
644 any kind of lock on the whole mds_lov_desc, which may change
645 (grow) as a result of mds_lov_add_ost. This also avoids any
646 kind of mismatch between the lov_desc and the mds_lov_desc,
647 which are not in lock-step during lov_add_obd */
648 static int __mds_lov_synchronize(void *data)
650 struct mds_lov_sync_info *mlsi = data;
651 struct obd_device *obd = mlsi->mlsi_obd;
652 struct obd_device *watched = mlsi->mlsi_watched;
653 struct mds_obd *mds = &obd->u.mds;
654 struct obd_uuid *uuid;
655 __u32 idx = mlsi->mlsi_index;
656 struct mds_group_info mgi;
657 struct llog_ctxt *ctxt;
665 uuid = &watched->u.cli.cl_target_uuid;
668 down_read(&mds->mds_notify_lock);
669 if (obd->obd_stopping || obd->obd_fail)
670 GOTO(out, rc = -ENODEV);
672 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
673 rc = mds_lov_update_mds(obd, watched, idx);
675 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
678 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
681 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
682 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
685 /* propagate capability keys */
686 rc = mds_propagate_capa_keys(mds);
690 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
692 GOTO(out, rc = -ENODEV);
694 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
695 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
699 CERROR("%s failed at llog_origin_connect: %d\n",
700 obd_uuid2str(uuid), rc);
704 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
705 obd->obd_name, obd_uuid2str(uuid));
706 rc = mds_lov_clear_orphans(mds, uuid);
708 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
709 obd_uuid2str(uuid), rc);
713 if (obd->obd_upcall.onu_owner) {
715 * This is a hack for mds_notify->mdd_notify. When the mds obd
716 * in mdd is removed, This hack should be removed.
718 LASSERT(obd->obd_upcall.onu_upcall != NULL);
719 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
720 obd->obd_upcall.onu_owner);
724 up_read(&mds->mds_notify_lock);
726 /* Deactivate it for safety */
727 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
729 if (!obd->obd_stopping && mds->mds_osc_obd &&
730 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
731 obd_notify(mds->mds_osc_obd, watched,
732 OBD_NOTIFY_INACTIVE, NULL);
739 int mds_lov_synchronize(void *data)
741 struct mds_lov_sync_info *mlsi = data;
744 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
745 ptlrpc_daemonize(name);
747 RETURN(__mds_lov_synchronize(data));
750 int mds_lov_start_synchronize(struct obd_device *obd,
751 struct obd_device *watched,
752 void *data, int nonblock)
754 struct mds_lov_sync_info *mlsi;
756 struct mds_obd *mds = &obd->u.mds;
757 struct obd_uuid *uuid;
761 uuid = &watched->u.cli.cl_target_uuid;
763 OBD_ALLOC(mlsi, sizeof(*mlsi));
767 mlsi->mlsi_obd = obd;
768 mlsi->mlsi_watched = watched;
770 mlsi->mlsi_index = *(__u32 *)data;
772 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
774 /* Although class_export_get(obd->obd_self_export) would lock
775 the MDS in place, since it's only a self-export
776 it doesn't lock the LOV in place. The LOV can be disconnected
777 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
778 Simply taking an export ref on the LOV doesn't help, because it's
779 still disconnected. Taking an obd reference insures that we don't
780 disconnect the LOV. This of course means a cleanup won't
781 finish for as long as the sync is blocking. */
785 /* Synchronize in the background */
786 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
787 CLONE_VM | CLONE_FILES);
789 CERROR("%s: error starting mds_lov_synchronize: %d\n",
793 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
794 "thread=%d\n", obd->obd_name,
795 mlsi->mlsi_index, rc);
799 rc = __mds_lov_synchronize((void *)mlsi);
805 int mds_notify(struct obd_device *obd, struct obd_device *watched,
806 enum obd_notify_event ev, void *data)
812 /* We only handle these: */
813 case OBD_NOTIFY_ACTIVE:
814 case OBD_NOTIFY_SYNC:
815 case OBD_NOTIFY_SYNC_NONBLOCK:
817 case OBD_NOTIFY_CONFIG:
818 mds_allow_cli(obd, (unsigned long)data);
823 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
824 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
825 CERROR("unexpected notification of %s %s!\n",
826 watched->obd_type->typ_name, watched->obd_name);
830 if (obd->obd_recovering) {
831 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
833 obd_uuid2str(&watched->u.cli.cl_target_uuid));
834 /* We still have to fix the lov descriptor for ost's added
835 after the mdt in the config log. They didn't make it into
837 mutex_down(&obd->obd_dev_sem);
838 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
840 mutex_up(&obd->obd_dev_sem);
843 /* We should update init llog here too for replay unlink and
844 * possiable llog init race when recovery complete */
845 llog_cat_initialize(obd, &obd->obd_olg,
846 obd->u.mds.mds_lov_desc.ld_tgt_count,
847 &watched->u.cli.cl_target_uuid);
848 mutex_up(&obd->obd_dev_sem);
849 mds_allow_cli(obd, CONFIG_SYNC);
853 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
854 rc = mds_lov_start_synchronize(obd, watched, data,
855 !(ev == OBD_NOTIFY_SYNC));
857 lquota_recovery(mds_quota_interface_ref, obd);