1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
53 #include "mds_internal.h"
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 struct mds_obd *mds = &obd->u.mds;
60 CDEBUG(D_INFO, "dump from %s\n", label);
61 if (mds->mds_lov_page_dirty == NULL) {
62 CERROR("NULL bitmap!\n");
66 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 if (mds->mds_lov_page_array == NULL) {
70 CERROR("not init page array!\n");
74 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75 obd_id *data = mds->mds_lov_page_array[i];
80 for(j=0; j < OBJID_PER_PAGE(); j++) {
83 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
90 int mds_lov_init_objids(struct obd_device *obd)
92 struct mds_obd *mds = &obd->u.mds;
93 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
98 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101 if (mds->mds_lov_page_dirty == NULL)
105 OBD_ALLOC(mds->mds_lov_page_array, size);
106 if (mds->mds_lov_page_array == NULL)
107 GOTO(err_free_bitmap, rc = -ENOMEM);
109 /* open and test the lov objd file */
110 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
113 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114 GOTO(err_free, rc = PTR_ERR(file));
116 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118 file->f_dentry->d_inode->i_mode);
119 GOTO(err_open, rc = -ENOENT);
121 mds->mds_lov_objid_filp = file;
125 if (filp_close((struct file *)file, 0))
126 CERROR("can't close %s after error\n", LOV_OBJID);
128 OBD_FREE(mds->mds_lov_page_array, size);
130 FREE_BITMAP(mds->mds_lov_page_dirty);
135 void mds_lov_destroy_objids(struct obd_device *obd)
137 struct mds_obd *mds = &obd->u.mds;
141 if (mds->mds_lov_page_array != NULL) {
142 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143 obd_id *data = mds->mds_lov_page_array[i];
145 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147 OBD_FREE(mds->mds_lov_page_array,
148 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
151 if (mds->mds_lov_objid_filp) {
152 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153 mds->mds_lov_objid_filp = NULL;
155 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
158 FREE_BITMAP(mds->mds_lov_page_dirty);
162 static int mds_lov_read_objids(struct obd_device *obd)
164 struct mds_obd *mds = &obd->u.mds;
166 int i, rc, count = 0, page = 0;
170 /* Read everything in the file, even if our current lov desc
171 has fewer targets. Old targets not in the lov descriptor
172 during mds setup may still have valid objids. */
173 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
177 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
180 for (i = 0; i < page; i++) {
181 obd_id *data = mds->mds_lov_page_array[i];
182 loff_t off_old = off;
184 LASSERT(data == NULL);
185 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
187 GOTO(out, rc = -ENOMEM);
189 mds->mds_lov_page_array[i] = data;
191 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192 OBJID_PER_PAGE()*sizeof(obd_id), &off);
194 CERROR("Error reading objids %d\n", rc);
200 count += (off - off_old)/sizeof(obd_id);
202 mds->mds_lov_objid_count = count;
205 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
208 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
211 mds_lov_dump_objids("read",obd);
216 int mds_lov_write_objids(struct obd_device *obd)
218 struct mds_obd *mds = &obd->u.mds;
222 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
225 mds_lov_dump_objids("write", obd);
227 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228 obd_id *data = mds->mds_lov_page_array[i];
229 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230 loff_t off = i * size;
232 LASSERT(data != NULL);
234 /* check for particaly filled last page */
235 if (i == mds->mds_lov_objid_lastpage)
236 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
238 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
242 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
249 EXPORT_SYMBOL(mds_lov_write_objids);
251 static int mds_lov_get_objid(struct obd_device * obd,
254 struct mds_obd *mds = &obd->u.mds;
261 page = idx / OBJID_PER_PAGE();
262 off = idx % OBJID_PER_PAGE();
263 data = mds->mds_lov_page_array[page];
265 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
267 GOTO(out, rc = -ENOMEM);
269 mds->mds_lov_page_array[page] = data;
272 if (data[off] == 0) {
273 /* We never read this lastid; ask the osc */
274 struct obd_id_info lastid;
275 __u32 size = sizeof(lastid);
278 lastid.data = &data[off];
279 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280 KEY_LAST_ID, &size, &lastid);
284 if (idx > mds->mds_lov_objid_count) {
285 mds->mds_lov_objid_count = idx;
286 mds->mds_lov_objid_lastpage = page;
287 mds->mds_lov_objid_lastidx = off;
289 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
299 struct obd_trans_info oti = {0};
300 struct lov_stripe_md *empty_ea = NULL;
303 LASSERT(mds->mds_lov_page_array != NULL);
305 /* This create will in fact either create or destroy: If the OST is
306 * missing objects below this ID, they will be created. If it finds
307 * objects above this ID, they will be removed. */
308 memset(&oa, 0, sizeof(oa));
309 oa.o_flags = OBD_FL_DELORPHAN;
310 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312 if (ost_uuid != NULL) {
313 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
314 oa.o_valid |= OBD_MD_FLINLINE;
316 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
322 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
324 struct mds_obd *mds = &obd->u.mds;
326 struct obd_id_info info;
329 LASSERT(!obd->obd_recovering);
331 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
332 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
336 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
337 KEY_NEXT_ID, sizeof(info), &info, NULL);
339 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
345 static __u32 mds_lov_get_idx(struct obd_export *lov,
346 struct obd_uuid *ost_uuid)
349 int valsize = sizeof(ost_uuid);
351 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
358 /* Update the lov desc for a new size lov. */
359 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
361 struct mds_obd *mds = &obd->u.mds;
363 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
367 OBD_ALLOC(ld, sizeof(*ld));
371 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
376 /* Don't change the mds_lov_desc until the objids size matches the
378 mds->mds_lov_desc = *ld;
379 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
380 mds->mds_lov_desc.ld_tgt_count);
382 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
383 mds->mds_lov_desc.ld_tgt_count);
385 mds->mds_max_mdsize = lov_mds_md_size(stripes);
386 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
387 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
391 /* If we added a target we have to reconnect the llogs */
392 /* We only _need_ to do this at first add (idx), or the first time
393 after recovery. However, it should now be safe to call anytime. */
394 rc = llog_cat_initialize(obd, &obd->obd_olg,
395 mds->mds_lov_desc.ld_tgt_count, NULL);
397 /*XXX this notifies the MDD until lov handling use old mds code */
398 if (obd->obd_upcall.onu_owner) {
399 LASSERT(obd->obd_upcall.onu_upcall != NULL);
400 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
401 obd->obd_upcall.onu_owner);
404 OBD_FREE(ld, sizeof(*ld));
409 #define MDSLOV_NO_INDEX -1
411 /* Inform MDS about new/updated target */
412 static int mds_lov_update_mds(struct obd_device *obd,
413 struct obd_device *watched,
416 struct mds_obd *mds = &obd->u.mds;
424 /* Don't let anyone else mess with mds_lov_objids now */
425 mutex_down(&obd->obd_dev_sem);
427 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
431 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
432 idx, obd->obd_recovering, obd->obd_async_recov,
433 mds->mds_lov_desc.ld_tgt_count);
435 /* idx is set as data from lov_notify. */
436 if (obd->obd_recovering)
439 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
440 CERROR("index %d > count %d!\n", idx,
441 mds->mds_lov_desc.ld_tgt_count);
442 GOTO(out, rc = -EINVAL);
445 rc = mds_lov_get_objid(obd, idx);
449 page = idx / OBJID_PER_PAGE();
450 off = idx % OBJID_PER_PAGE();
451 data = mds->mds_lov_page_array[page];
453 /* We have read this lastid from disk; tell the osc.
454 Don't call this during recovery. */
455 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
457 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
458 /* Don't abort the rest of the sync */
461 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
465 mutex_up(&obd->obd_dev_sem);
469 /* update the LOV-OSC knowledge of the last used object id's */
470 int mds_lov_connect(struct obd_device *obd, char * lov_name)
472 struct mds_obd *mds = &obd->u.mds;
473 struct lustre_handle conn = {0,};
474 struct obd_connect_data *data;
478 if (IS_ERR(mds->mds_osc_obd))
479 RETURN(PTR_ERR(mds->mds_osc_obd));
481 if (mds->mds_osc_obd)
484 mds->mds_osc_obd = class_name2obd(lov_name);
485 if (!mds->mds_osc_obd) {
486 CERROR("MDS cannot locate LOV %s\n", lov_name);
487 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
491 OBD_ALLOC(data, sizeof(*data));
494 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
495 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
496 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
498 #ifdef HAVE_LRU_RESIZE_SUPPORT
499 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
501 data->ocd_version = LUSTRE_VERSION_CODE;
502 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
503 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
504 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
505 OBD_FREE(data, sizeof(*data));
507 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
508 mds->mds_osc_obd = ERR_PTR(rc);
511 mds->mds_osc_exp = class_conn2export(&conn);
513 rc = obd_register_observer(mds->mds_osc_obd, obd);
515 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
517 GOTO(err_discon, rc);
520 /* Deny new client connections until we are sure we have some OSTs */
521 obd->obd_no_conn = 1;
523 mutex_down(&obd->obd_dev_sem);
524 rc = mds_lov_read_objids(obd);
526 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
530 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
534 /* tgt_count may be 0! */
535 rc = llog_cat_initialize(obd, &obd->obd_olg,
536 mds->mds_lov_desc.ld_tgt_count, NULL);
538 CERROR("failed to initialize catalog %d\n", rc);
542 /* If we're mounting this code for the first time on an existing FS,
543 * we need to populate the objids array from the real OST values */
544 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
545 __u32 i = mds->mds_lov_objid_count;
546 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
547 rc = mds_lov_get_objid(obd, i);
552 rc = mds_lov_write_objids(obd);
554 CERROR("got last objids from OSTs, but error "
555 "in update objids file: %d\n", rc);
557 mutex_up(&obd->obd_dev_sem);
559 /* I want to see a callback happen when the OBD moves to a
560 * "For General Use" state, and that's when we'll call
561 * set_nextid(). The class driver can help us here, because
562 * it can use the obd_recovering flag to determine when the
563 * the OBD is full available. */
564 /* MDD device will care about that
565 if (!obd->obd_recovering)
566 rc = mds_postrecov(obd);
571 mutex_up(&obd->obd_dev_sem);
572 obd_register_observer(mds->mds_osc_obd, NULL);
574 obd_disconnect(mds->mds_osc_exp);
575 mds->mds_osc_exp = NULL;
576 mds->mds_osc_obd = ERR_PTR(rc);
580 int mds_lov_disconnect(struct obd_device *obd)
582 struct mds_obd *mds = &obd->u.mds;
586 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
587 obd_register_observer(mds->mds_osc_obd, NULL);
589 /* The actual disconnect of the mds_lov will be called from
590 * class_disconnect_exports from mds_lov_clean. So we have to
591 * ensure that class_cleanup doesn't fail due to the extra ref
592 * we're holding now. The mechanism to do that already exists -
593 * the obd_force flag. We'll drop the final ref to the
594 * mds_osc_exp in mds_cleanup. */
595 mds->mds_osc_obd->obd_force = 1;
601 /* Collect the preconditions we need to allow client connects */
602 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
604 if (flag & CONFIG_LOG)
605 obd->u.mds.mds_fl_cfglog = 1;
606 if (flag & CONFIG_SYNC)
607 obd->u.mds.mds_fl_synced = 1;
608 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
609 /* Open for clients */
610 obd->obd_no_conn = 0;
613 struct mds_lov_sync_info {
614 struct obd_device *mlsi_obd; /* the lov device to sync */
615 struct obd_device *mlsi_watched; /* target osc */
616 __u32 mlsi_index; /* index of target */
619 static int mds_propagate_capa_keys(struct mds_obd *mds)
621 struct lustre_capa_key *key;
626 if (!mds->mds_capa_keys)
629 for (i = 0; i < 2; i++) {
630 key = &mds->mds_capa_keys[i];
631 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
633 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
634 KEY_CAPA_KEY, sizeof(*key), key, NULL);
636 DEBUG_CAPA_KEY(D_ERROR, key,
637 "propagate failed (rc = %d) for", rc);
645 /* We only sync one osc at a time, so that we don't have to hold
646 any kind of lock on the whole mds_lov_desc, which may change
647 (grow) as a result of mds_lov_add_ost. This also avoids any
648 kind of mismatch between the lov_desc and the mds_lov_desc,
649 which are not in lock-step during lov_add_obd */
650 static int __mds_lov_synchronize(void *data)
652 struct mds_lov_sync_info *mlsi = data;
653 struct obd_device *obd = mlsi->mlsi_obd;
654 struct obd_device *watched = mlsi->mlsi_watched;
655 struct mds_obd *mds = &obd->u.mds;
656 struct obd_uuid *uuid;
657 __u32 idx = mlsi->mlsi_index;
658 struct mds_group_info mgi;
659 struct llog_ctxt *ctxt;
667 uuid = &watched->u.cli.cl_target_uuid;
670 down_read(&mds->mds_notify_lock);
671 if (obd->obd_stopping || obd->obd_fail)
672 GOTO(out, rc = -ENODEV);
674 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
675 rc = mds_lov_update_mds(obd, watched, idx);
677 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
680 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
683 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
684 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
687 /* propagate capability keys */
688 rc = mds_propagate_capa_keys(mds);
692 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
694 GOTO(out, rc = -ENODEV);
696 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
697 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
701 CERROR("%s failed at llog_origin_connect: %d\n",
702 obd_uuid2str(uuid), rc);
706 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
707 obd->obd_name, obd_uuid2str(uuid));
708 rc = mds_lov_clear_orphans(mds, uuid);
710 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
711 obd_uuid2str(uuid), rc);
715 if (obd->obd_upcall.onu_owner) {
717 * This is a hack for mds_notify->mdd_notify. When the mds obd
718 * in mdd is removed, This hack should be removed.
720 LASSERT(obd->obd_upcall.onu_upcall != NULL);
721 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
722 obd->obd_upcall.onu_owner);
726 up_read(&mds->mds_notify_lock);
728 /* Deactivate it for safety */
729 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
731 if (!obd->obd_stopping && mds->mds_osc_obd &&
732 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
733 obd_notify(mds->mds_osc_obd, watched,
734 OBD_NOTIFY_INACTIVE, NULL);
741 int mds_lov_synchronize(void *data)
743 struct mds_lov_sync_info *mlsi = data;
746 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
747 ptlrpc_daemonize(name);
749 RETURN(__mds_lov_synchronize(data));
752 int mds_lov_start_synchronize(struct obd_device *obd,
753 struct obd_device *watched,
754 void *data, int nonblock)
756 struct mds_lov_sync_info *mlsi;
758 struct mds_obd *mds = &obd->u.mds;
759 struct obd_uuid *uuid;
763 uuid = &watched->u.cli.cl_target_uuid;
765 OBD_ALLOC(mlsi, sizeof(*mlsi));
769 mlsi->mlsi_obd = obd;
770 mlsi->mlsi_watched = watched;
772 mlsi->mlsi_index = *(__u32 *)data;
774 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
776 /* Although class_export_get(obd->obd_self_export) would lock
777 the MDS in place, since it's only a self-export
778 it doesn't lock the LOV in place. The LOV can be disconnected
779 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
780 Simply taking an export ref on the LOV doesn't help, because it's
781 still disconnected. Taking an obd reference insures that we don't
782 disconnect the LOV. This of course means a cleanup won't
783 finish for as long as the sync is blocking. */
787 /* Synchronize in the background */
788 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
789 CLONE_VM | CLONE_FILES);
791 CERROR("%s: error starting mds_lov_synchronize: %d\n",
795 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
796 "thread=%d\n", obd->obd_name,
797 mlsi->mlsi_index, rc);
801 rc = __mds_lov_synchronize((void *)mlsi);
807 int mds_notify(struct obd_device *obd, struct obd_device *watched,
808 enum obd_notify_event ev, void *data)
814 /* We only handle these: */
815 case OBD_NOTIFY_ACTIVE:
816 case OBD_NOTIFY_SYNC:
817 case OBD_NOTIFY_SYNC_NONBLOCK:
819 case OBD_NOTIFY_CONFIG:
820 mds_allow_cli(obd, (unsigned long)data);
825 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
826 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
827 CERROR("unexpected notification of %s %s!\n",
828 watched->obd_type->typ_name, watched->obd_name);
832 if (obd->obd_recovering) {
833 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
835 obd_uuid2str(&watched->u.cli.cl_target_uuid));
836 /* We still have to fix the lov descriptor for ost's added
837 after the mdt in the config log. They didn't make it into
839 mutex_down(&obd->obd_dev_sem);
840 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
842 mutex_up(&obd->obd_dev_sem);
845 /* We should update init llog here too for replay unlink and
846 * possiable llog init race when recovery complete */
847 llog_cat_initialize(obd, &obd->obd_olg,
848 obd->u.mds.mds_lov_desc.ld_tgt_count,
849 &watched->u.cli.cl_target_uuid);
850 mutex_up(&obd->obd_dev_sem);
851 mds_allow_cli(obd, CONFIG_SYNC);
855 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
856 rc = mds_lov_start_synchronize(obd, watched, data,
857 !(ev == OBD_NOTIFY_SYNC));
859 lquota_recovery(mds_quota_interface_ref, obd);