1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 #define DEBUG_SUBSYSTEM S_MDS
31 #include <linux/module.h>
32 #include <lustre_mds.h>
33 #include <lustre/lustre_idl.h>
34 #include <obd_class.h>
36 #include <lustre_lib.h>
37 #include <lustre_fsfilt.h>
39 #include "mds_internal.h"
41 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
43 struct mds_obd *mds = &obd->u.mds;
46 CDEBUG(D_INFO, "dump from %s\n", label);
47 if (mds->mds_lov_page_dirty == NULL) {
48 CERROR("NULL bitmap!\n");
52 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
53 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
55 if (mds->mds_lov_page_array == NULL) {
56 CERROR("not init page array!\n");
60 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
61 obd_id *data = mds->mds_lov_page_array[i];
66 for(j=0; j < OBJID_PER_PAGE(); j++) {
69 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
76 int mds_lov_init_objids(struct obd_device *obd)
78 struct mds_obd *mds = &obd->u.mds;
79 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
84 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
86 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
87 if (mds->mds_lov_page_dirty == NULL)
91 OBD_ALLOC(mds->mds_lov_page_array, size);
92 if (mds->mds_lov_page_array == NULL)
93 GOTO(err_free_bitmap, rc = -ENOMEM);
95 /* open and test the lov objd file */
96 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
99 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
100 GOTO(err_free, rc = PTR_ERR(file));
102 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
103 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
104 file->f_dentry->d_inode->i_mode);
105 GOTO(err_open, rc = -ENOENT);
107 mds->mds_lov_objid_filp = file;
111 if (filp_close((struct file *)file, 0))
112 CERROR("can't close %s after error\n", LOV_OBJID);
114 OBD_FREE(mds->mds_lov_page_array, size);
116 FREE_BITMAP(mds->mds_lov_page_dirty);
121 void mds_lov_destroy_objids(struct obd_device *obd)
123 struct mds_obd *mds = &obd->u.mds;
127 if (mds->mds_lov_page_array != NULL) {
128 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
129 obd_id *data = mds->mds_lov_page_array[i];
131 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
133 OBD_FREE(mds->mds_lov_page_array,
134 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
137 if (mds->mds_lov_objid_filp) {
138 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
139 mds->mds_lov_objid_filp = NULL;
141 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
144 FREE_BITMAP(mds->mds_lov_page_dirty);
148 static int mds_lov_read_objids(struct obd_device *obd)
150 struct mds_obd *mds = &obd->u.mds;
152 int i, rc, count = 0, page = 0;
156 /* Read everything in the file, even if our current lov desc
157 has fewer targets. Old targets not in the lov descriptor
158 during mds setup may still have valid objids. */
159 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
163 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
164 CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
165 for(i=0; i < page; i++) {
166 obd_id *data = mds->mds_lov_page_array[i];
167 loff_t off_old = off;
169 LASSERT(data == NULL);
170 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
172 GOTO(out, rc = -ENOMEM);
174 mds->mds_lov_page_array[i] = data;
176 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
177 OBJID_PER_PAGE()*sizeof(obd_id), &off);
179 CERROR("Error reading objids %d\n", rc);
185 count += (off-off_old)/sizeof(obd_id);
187 mds->mds_lov_objid_count = count;
190 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
191 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
193 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
194 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
196 mds_lov_dump_objids("read",obd);
201 int mds_lov_write_objids(struct obd_device *obd)
203 struct mds_obd *mds = &obd->u.mds;
207 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
210 mds_lov_dump_objids("write", obd);
212 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
213 obd_id *data = mds->mds_lov_page_array[i];
214 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
215 loff_t off = i * size;
217 LASSERT(data != NULL);
219 /* check for particaly filled last page */
220 if (i == mds->mds_lov_objid_lastpage)
221 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
223 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
227 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
234 EXPORT_SYMBOL(mds_lov_write_objids);
236 static int mds_lov_get_objid(struct obd_device * obd,
239 struct mds_obd *mds = &obd->u.mds;
246 page = idx / OBJID_PER_PAGE();
247 off = idx % OBJID_PER_PAGE();
248 data = mds->mds_lov_page_array[page];
250 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
252 GOTO(out, rc = -ENOMEM);
254 mds->mds_lov_page_array[page] = data;
257 if (data[off] == 0) {
258 /* We never read this lastid; ask the osc */
259 struct obd_id_info lastid;
260 __u32 size = sizeof(lastid);
263 lastid.data = &data[off];
264 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
265 KEY_LAST_ID, &size, &lastid);
269 if (idx > mds->mds_lov_objid_count) {
270 mds->mds_lov_objid_count = idx;
271 mds->mds_lov_objid_lastpage = page;
272 mds->mds_lov_objid_lastidx = off;
274 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
280 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
284 struct obd_trans_info oti = {0};
285 struct lov_stripe_md *empty_ea = NULL;
288 LASSERT(mds->mds_lov_page_array != NULL);
290 /* This create will in fact either create or destroy: If the OST is
291 * missing objects below this ID, they will be created. If it finds
292 * objects above this ID, they will be removed. */
293 memset(&oa, 0, sizeof(oa));
294 oa.o_flags = OBD_FL_DELORPHAN;
295 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
296 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
297 if (ost_uuid != NULL) {
298 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
299 oa.o_valid |= OBD_MD_FLINLINE;
301 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
307 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
309 struct mds_obd *mds = &obd->u.mds;
311 struct obd_id_info info;
314 LASSERT(!obd->obd_recovering);
316 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
317 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
321 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
322 KEY_NEXT_ID, sizeof(info), &info, NULL);
324 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
330 static __u32 mds_lov_get_idx(struct obd_export *lov,
331 struct obd_uuid *ost_uuid)
334 int valsize = sizeof(ost_uuid);
336 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
346 struct mds_obd *mds = &obd->u.mds;
348 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
352 OBD_ALLOC(ld, sizeof(*ld));
356 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
361 /* Don't change the mds_lov_desc until the objids size matches the
363 mds->mds_lov_desc = *ld;
364 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
365 mds->mds_lov_desc.ld_tgt_count);
367 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
368 mds->mds_lov_desc.ld_tgt_count);
370 mds->mds_max_mdsize = lov_mds_md_size(stripes);
371 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
372 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
373 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
376 /* If we added a target we have to reconnect the llogs */
377 /* We only _need_ to do this at first add (idx), or the first time
378 after recovery. However, it should now be safe to call anytime. */
379 rc = llog_cat_initialize(obd, &obd->obd_olg,
380 mds->mds_lov_desc.ld_tgt_count, NULL);
382 /*XXX this notifies the MDD until lov handling use old mds code */
383 if (obd->obd_upcall.onu_owner) {
384 LASSERT(obd->obd_upcall.onu_upcall != NULL);
385 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
386 obd->obd_upcall.onu_owner);
389 OBD_FREE(ld, sizeof(*ld));
394 #define MDSLOV_NO_INDEX -1
396 /* Inform MDS about new/updated target */
397 static int mds_lov_update_mds(struct obd_device *obd,
398 struct obd_device *watched,
401 struct mds_obd *mds = &obd->u.mds;
409 /* Don't let anyone else mess with mds_lov_objids now */
410 mutex_down(&obd->obd_dev_sem);
412 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
416 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
417 idx, obd->obd_recovering, obd->obd_async_recov,
418 mds->mds_lov_desc.ld_tgt_count);
420 /* idx is set as data from lov_notify. */
421 if (obd->obd_recovering)
424 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
425 CERROR("index %d > count %d!\n", idx,
426 mds->mds_lov_desc.ld_tgt_count);
427 GOTO(out, rc = -EINVAL);
430 rc = mds_lov_get_objid(obd, idx);
434 page = idx / OBJID_PER_PAGE();
435 off = idx % OBJID_PER_PAGE();
436 data = mds->mds_lov_page_array[page];
438 /* We have read this lastid from disk; tell the osc.
439 Don't call this during recovery. */
440 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
442 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
443 /* Don't abort the rest of the sync */
446 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
450 mutex_up(&obd->obd_dev_sem);
454 /* update the LOV-OSC knowledge of the last used object id's */
455 int mds_lov_connect(struct obd_device *obd, char * lov_name)
457 struct mds_obd *mds = &obd->u.mds;
458 struct lustre_handle conn = {0,};
459 struct obd_connect_data *data;
463 if (IS_ERR(mds->mds_osc_obd))
464 RETURN(PTR_ERR(mds->mds_osc_obd));
466 if (mds->mds_osc_obd)
469 mds->mds_osc_obd = class_name2obd(lov_name);
470 if (!mds->mds_osc_obd) {
471 CERROR("MDS cannot locate LOV %s\n", lov_name);
472 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
476 OBD_ALLOC(data, sizeof(*data));
479 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
480 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
481 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID;
482 #ifdef HAVE_LRU_RESIZE_SUPPORT
483 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
485 data->ocd_version = LUSTRE_VERSION_CODE;
486 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
487 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
488 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
489 OBD_FREE(data, sizeof(*data));
491 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
492 mds->mds_osc_obd = ERR_PTR(rc);
495 mds->mds_osc_exp = class_conn2export(&conn);
497 rc = obd_register_observer(mds->mds_osc_obd, obd);
499 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
501 GOTO(err_discon, rc);
504 /* Deny new client connections until we are sure we have some OSTs */
505 obd->obd_no_conn = 1;
507 mutex_down(&obd->obd_dev_sem);
508 rc = mds_lov_read_objids(obd);
510 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
514 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
518 /* tgt_count may be 0! */
519 rc = llog_cat_initialize(obd, &obd->obd_olg,
520 mds->mds_lov_desc.ld_tgt_count, NULL);
522 CERROR("failed to initialize catalog %d\n", rc);
526 /* If we're mounting this code for the first time on an existing FS,
527 * we need to populate the objids array from the real OST values */
528 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
529 __u32 i = mds->mds_lov_objid_count;
530 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
531 rc = mds_lov_get_objid(obd, i);
536 rc = mds_lov_write_objids(obd);
538 CERROR("got last objids from OSTs, but error "
539 "in update objids file: %d\n", rc);
541 mutex_up(&obd->obd_dev_sem);
543 /* I want to see a callback happen when the OBD moves to a
544 * "For General Use" state, and that's when we'll call
545 * set_nextid(). The class driver can help us here, because
546 * it can use the obd_recovering flag to determine when the
547 * the OBD is full available. */
548 /* MDD device will care about that
549 if (!obd->obd_recovering)
550 rc = mds_postrecov(obd);
555 mutex_up(&obd->obd_dev_sem);
556 obd_register_observer(mds->mds_osc_obd, NULL);
558 obd_disconnect(mds->mds_osc_exp);
559 mds->mds_osc_exp = NULL;
560 mds->mds_osc_obd = ERR_PTR(rc);
564 int mds_lov_disconnect(struct obd_device *obd)
566 struct mds_obd *mds = &obd->u.mds;
570 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
571 obd_register_observer(mds->mds_osc_obd, NULL);
573 /* The actual disconnect of the mds_lov will be called from
574 * class_disconnect_exports from mds_lov_clean. So we have to
575 * ensure that class_cleanup doesn't fail due to the extra ref
576 * we're holding now. The mechanism to do that already exists -
577 * the obd_force flag. We'll drop the final ref to the
578 * mds_osc_exp in mds_cleanup. */
579 mds->mds_osc_obd->obd_force = 1;
585 /* Collect the preconditions we need to allow client connects */
586 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
588 if (flag & CONFIG_LOG)
589 obd->u.mds.mds_fl_cfglog = 1;
590 if (flag & CONFIG_SYNC)
591 obd->u.mds.mds_fl_synced = 1;
592 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
593 /* Open for clients */
594 obd->obd_no_conn = 0;
597 struct mds_lov_sync_info {
598 struct obd_device *mlsi_obd; /* the lov device to sync */
599 struct obd_device *mlsi_watched; /* target osc */
600 __u32 mlsi_index; /* index of target */
603 static int mds_propagate_capa_keys(struct mds_obd *mds)
605 struct lustre_capa_key *key;
610 if (!mds->mds_capa_keys)
613 for (i = 0; i < 2; i++) {
614 key = &mds->mds_capa_keys[i];
615 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
617 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
618 KEY_CAPA_KEY, sizeof(*key), key, NULL);
620 DEBUG_CAPA_KEY(D_ERROR, key,
621 "propagate failed (rc = %d) for", rc);
629 /* We only sync one osc at a time, so that we don't have to hold
630 any kind of lock on the whole mds_lov_desc, which may change
631 (grow) as a result of mds_lov_add_ost. This also avoids any
632 kind of mismatch between the lov_desc and the mds_lov_desc,
633 which are not in lock-step during lov_add_obd */
634 static int __mds_lov_synchronize(void *data)
636 struct mds_lov_sync_info *mlsi = data;
637 struct obd_device *obd = mlsi->mlsi_obd;
638 struct obd_device *watched = mlsi->mlsi_watched;
639 struct mds_obd *mds = &obd->u.mds;
640 struct obd_uuid *uuid;
641 __u32 idx = mlsi->mlsi_index;
642 struct mds_group_info mgi;
643 struct llog_ctxt *ctxt;
647 OBD_FREE(mlsi, sizeof(*mlsi));
651 uuid = &watched->u.cli.cl_target_uuid;
654 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
656 rc = mds_lov_update_mds(obd, watched, idx);
658 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
662 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
665 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
666 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
669 /* propagate capability keys */
670 rc = mds_propagate_capa_keys(mds);
674 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
678 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
680 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
684 CERROR("%s failed at llog_origin_connect: %d\n",
685 obd_uuid2str(uuid), rc);
689 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
690 obd->obd_name, obd_uuid2str(uuid));
692 * FIXME: this obd_stopping was useless,
693 * since obd in mdt layer was set
695 if (obd->obd_stopping)
696 GOTO(out, rc = -ENODEV);
698 rc = mds_lov_clear_orphans(mds, uuid);
700 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
701 obd_uuid2str(uuid), rc);
705 if (obd->obd_upcall.onu_owner) {
707 * This is a hack for mds_notify->mdd_notify. When the mds obd
708 * in mdd is removed, This hack should be removed.
710 LASSERT(obd->obd_upcall.onu_upcall != NULL);
711 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
712 obd->obd_upcall.onu_owner);
717 /* Deactivate it for safety */
718 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
720 if (!obd->obd_stopping && mds->mds_osc_obd &&
721 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
722 obd_notify(mds->mds_osc_obd, watched,
723 OBD_NOTIFY_INACTIVE, NULL);
730 int mds_lov_synchronize(void *data)
732 struct mds_lov_sync_info *mlsi = data;
735 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
736 ptlrpc_daemonize(name);
738 RETURN(__mds_lov_synchronize(data));
741 int mds_lov_start_synchronize(struct obd_device *obd,
742 struct obd_device *watched,
743 void *data, int nonblock)
745 struct mds_lov_sync_info *mlsi;
747 struct mds_obd *mds = &obd->u.mds;
748 struct obd_uuid *uuid;
752 uuid = &watched->u.cli.cl_target_uuid;
754 OBD_ALLOC(mlsi, sizeof(*mlsi));
758 mlsi->mlsi_obd = obd;
759 mlsi->mlsi_watched = watched;
761 mlsi->mlsi_index = *(__u32 *)data;
763 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
765 /* Although class_export_get(obd->obd_self_export) would lock
766 the MDS in place, since it's only a self-export
767 it doesn't lock the LOV in place. The LOV can be disconnected
768 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
769 Simply taking an export ref on the LOV doesn't help, because it's
770 still disconnected. Taking an obd reference insures that we don't
771 disconnect the LOV. This of course means a cleanup won't
772 finish for as long as the sync is blocking. */
776 /* Synchronize in the background */
777 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
778 CLONE_VM | CLONE_FILES);
780 CERROR("%s: error starting mds_lov_synchronize: %d\n",
784 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
785 "thread=%d\n", obd->obd_name,
786 mlsi->mlsi_index, rc);
790 rc = __mds_lov_synchronize((void *)mlsi);
796 int mds_notify(struct obd_device *obd, struct obd_device *watched,
797 enum obd_notify_event ev, void *data)
803 /* We only handle these: */
804 case OBD_NOTIFY_ACTIVE:
805 case OBD_NOTIFY_SYNC:
806 case OBD_NOTIFY_SYNC_NONBLOCK:
808 case OBD_NOTIFY_CONFIG:
809 mds_allow_cli(obd, (unsigned int)data);
814 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
815 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
816 CERROR("unexpected notification of %s %s!\n",
817 watched->obd_type->typ_name, watched->obd_name);
821 if (obd->obd_recovering) {
822 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
824 obd_uuid2str(&watched->u.cli.cl_target_uuid));
825 /* We still have to fix the lov descriptor for ost's added
826 after the mdt in the config log. They didn't make it into
828 mutex_down(&obd->obd_dev_sem);
829 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
831 mutex_up(&obd->obd_dev_sem);
834 /* We should update init llog here too for replay unlink and
835 * possiable llog init race when recovery complete */
836 llog_cat_initialize(obd, &obd->obd_olg,
837 obd->u.mds.mds_lov_desc.ld_tgt_count,
838 &watched->u.cli.cl_target_uuid);
839 mutex_up(&obd->obd_dev_sem);
840 mds_allow_cli(obd, CONFIG_SYNC);
844 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
845 rc = mds_lov_start_synchronize(obd, watched, data,
846 !(ev == OBD_NOTIFY_SYNC));
848 lquota_recovery(mds_quota_interface_ref, obd);