1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
70 if (mds->mds_lov_page_array == NULL) {
71 CERROR("not init page array!\n");
75 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
76 obd_id *data = mds->mds_lov_page_array[i];
81 for(j=0; j < OBJID_PER_PAGE(); j++) {
84 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
91 int mds_lov_init_objids(struct obd_device *obd)
93 struct mds_obd *mds = &obd->u.mds;
94 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
99 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
101 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
102 if (mds->mds_lov_page_dirty == NULL)
106 OBD_ALLOC(mds->mds_lov_page_array, size);
107 if (mds->mds_lov_page_array == NULL)
108 GOTO(err_free_bitmap, rc = -ENOMEM);
110 /* open and test the lov objd file */
111 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
114 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
115 GOTO(err_free, rc = PTR_ERR(file));
117 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
118 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
119 file->f_dentry->d_inode->i_mode);
120 GOTO(err_open, rc = -ENOENT);
122 mds->mds_lov_objid_filp = file;
126 if (filp_close((struct file *)file, 0))
127 CERROR("can't close %s after error\n", LOV_OBJID);
129 OBD_FREE(mds->mds_lov_page_array, size);
131 FREE_BITMAP(mds->mds_lov_page_dirty);
136 void mds_lov_destroy_objids(struct obd_device *obd)
138 struct mds_obd *mds = &obd->u.mds;
142 if (mds->mds_lov_page_array != NULL) {
143 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
144 obd_id *data = mds->mds_lov_page_array[i];
146 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
148 OBD_FREE(mds->mds_lov_page_array,
149 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
152 if (mds->mds_lov_objid_filp) {
153 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
154 mds->mds_lov_objid_filp = NULL;
156 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
159 FREE_BITMAP(mds->mds_lov_page_dirty);
164 * currently exist two ways for know about ost count and max ost index.
165 * first - after ost is connected to mds and sync process finished
166 * second - get from lmm in recovery process, in case when mds not have configs,
167 * and ost isn't registered in mgs.
169 * \param mds pointer to mds structure
170 * \param index maxium ost index
172 * \retval -ENOMEM is not hame memory for new page
173 * \retval 0 is update passed
175 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
177 __u32 page = index / OBJID_PER_PAGE();
178 __u32 off = index % OBJID_PER_PAGE();
179 obd_id *data = mds->mds_lov_page_array[page];
182 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186 mds->mds_lov_page_array[page] = data;
189 if (index > mds->mds_lov_objid_max_index) {
190 mds->mds_lov_objid_lastpage = page;
191 mds->mds_lov_objid_lastidx = off;
192 mds->mds_lov_objid_max_index = index;
195 /* workaround - New target not in objids file; increase mdsize */
196 /* ld_tgt_count is used as the max index everywhere, despite its name. */
197 if (data[off] == 0) {
201 mds->mds_lov_objid_count++;
202 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
203 mds->mds_lov_objid_count);
205 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
206 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
208 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
209 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
210 mds->mds_max_cookiesize);
217 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
219 struct lov_ost_data_v1 *data;
224 /* if we create file without objects - lmm is NULL */
228 switch (le32_to_cpu(lmm->lmm_magic)) {
230 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
231 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
234 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
235 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
238 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
243 mutex_down(&obd->obd_dev_sem);
244 for (j = 0; j < count; j++) {
245 __u32 i = le32_to_cpu(data[j].l_ost_idx);
246 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
251 mutex_up(&obd->obd_dev_sem);
255 EXPORT_SYMBOL(mds_lov_prepare_objids);
257 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
259 struct mds_obd *mds = &obd->u.mds;
261 struct lov_ost_data_v1 *obj;
265 /* if we create file without objects - lmm is NULL */
269 switch (le32_to_cpu(lmm->lmm_magic)) {
271 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
272 obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
275 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
276 obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
279 CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
283 for (j = 0; j < count; j++) {
284 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
285 obd_id id = le64_to_cpu(obj[j].l_object_id);
286 __u32 page = i / OBJID_PER_PAGE();
287 __u32 idx = i % OBJID_PER_PAGE();
290 data = mds->mds_lov_page_array[page];
292 CDEBUG(D_INODE,"update last object for ost %u"
293 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
294 if (id > data[idx]) {
296 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
302 EXPORT_SYMBOL(mds_lov_update_objids);
305 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
311 for(i = 0; i < count; i++) {
315 mds->mds_lov_objid_count++;
318 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
319 mds->mds_lov_objid_count);
321 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
322 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
324 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
325 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
331 static int mds_lov_read_objids(struct obd_device *obd)
333 struct mds_obd *mds = &obd->u.mds;
335 int i, rc, count = 0, page = 0;
339 /* Read everything in the file, even if our current lov desc
340 has fewer targets. Old targets not in the lov descriptor
341 during mds setup may still have valid objids. */
342 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
346 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
347 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
349 for (i = 0; i < page; i++) {
350 loff_t off_old = off;
352 LASSERT(mds->mds_lov_page_array[i] == NULL);
353 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
354 if (mds->mds_lov_page_array[i] == NULL)
355 GOTO(out, rc = -ENOMEM);
357 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
358 OBJID_PER_PAGE()*sizeof(obd_id), &off);
360 CERROR("Error reading objids %d\n", rc);
364 count += (off - off_old)/sizeof(obd_id);
365 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
366 CERROR("Can't update mds data\n");
367 GOTO(out, rc = -EIO);
373 mds->mds_lov_objid_lastpage = i;
374 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
376 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
377 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
379 mds_lov_dump_objids("read",obd);
384 int mds_lov_write_objids(struct obd_device *obd)
386 struct mds_obd *mds = &obd->u.mds;
390 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
393 mds_lov_dump_objids("write", obd);
395 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
396 obd_id *data = mds->mds_lov_page_array[i];
397 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
398 loff_t off = i * size;
400 LASSERT(data != NULL);
402 /* check for particaly filled last page */
403 if (i == mds->mds_lov_objid_lastpage)
404 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
406 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
410 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
417 EXPORT_SYMBOL(mds_lov_write_objids);
419 static int mds_lov_get_objid(struct obd_device * obd,
422 struct mds_obd *mds = &obd->u.mds;
429 page = idx / OBJID_PER_PAGE();
430 off = idx % OBJID_PER_PAGE();
431 data = mds->mds_lov_page_array[page];
432 if (data[off] == 0) {
433 /* We never read this lastid; ask the osc */
434 struct obd_id_info lastid;
435 __u32 size = sizeof(lastid);
438 lastid.data = &data[off];
439 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
440 KEY_LAST_ID, &size, &lastid, NULL);
444 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
450 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
454 struct obd_trans_info oti = {0};
455 struct lov_stripe_md *empty_ea = NULL;
458 LASSERT(mds->mds_lov_page_array != NULL);
460 /* This create will in fact either create or destroy: If the OST is
461 * missing objects below this ID, they will be created. If it finds
462 * objects above this ID, they will be removed. */
463 memset(&oa, 0, sizeof(oa));
464 oa.o_flags = OBD_FL_DELORPHAN;
465 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
466 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
467 if (ost_uuid != NULL)
468 oti.oti_ost_uuid = ost_uuid;
469 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
475 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
477 struct mds_obd *mds = &obd->u.mds;
479 struct obd_id_info info;
482 LASSERT(!obd->obd_recovering);
486 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
487 KEY_NEXT_ID, sizeof(info), &info, NULL);
489 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
495 /* Update the lov desc for a new size lov. */
496 static int mds_lov_update_desc(struct obd_device *obd, int idx,
497 struct obd_uuid *uuid)
499 struct mds_obd *mds = &obd->u.mds;
501 __u32 valsize = sizeof(mds->mds_lov_desc);
505 OBD_ALLOC(ld, sizeof(*ld));
509 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
514 /* Don't change the mds_lov_desc until the objids size matches the
516 mds->mds_lov_desc = *ld;
517 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
518 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
520 mutex_down(&obd->obd_dev_sem);
521 rc = mds_lov_update_max_ost(mds, idx);
522 mutex_up(&obd->obd_dev_sem);
527 /* If we added a target we have to reconnect the llogs */
528 /* We only _need_ to do this at first add (idx), or the first time
529 after recovery. However, it should now be safe to call anytime. */
530 rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
534 /*XXX this notifies the MDD until lov handling use old mds code */
535 if (obd->obd_upcall.onu_owner) {
536 LASSERT(obd->obd_upcall.onu_upcall != NULL);
537 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
538 obd->obd_upcall.onu_owner);
541 OBD_FREE(ld, sizeof(*ld));
545 /* Inform MDS about new/updated target */
546 static int mds_lov_update_mds(struct obd_device *obd,
547 struct obd_device *watched,
550 struct mds_obd *mds = &obd->u.mds;
558 /* Don't let anyone else mess with mds_lov_objids now */
559 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
563 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
564 idx, obd->obd_recovering, obd->obd_async_recov,
565 mds->mds_lov_desc.ld_tgt_count);
567 /* idx is set as data from lov_notify. */
568 if (obd->obd_recovering)
571 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
572 CERROR("index %d > count %d!\n", idx,
573 mds->mds_lov_desc.ld_tgt_count);
574 GOTO(out, rc = -EINVAL);
577 rc = mds_lov_get_objid(obd, idx);
581 page = idx / OBJID_PER_PAGE();
582 off = idx % OBJID_PER_PAGE();
583 data = mds->mds_lov_page_array[page];
585 /* We have read this lastid from disk; tell the osc.
586 Don't call this during recovery. */
587 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
589 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
590 /* Don't abort the rest of the sync */
593 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
600 /* update the LOV-OSC knowledge of the last used object id's */
601 int mds_lov_connect(struct obd_device *obd, char * lov_name)
603 struct mds_obd *mds = &obd->u.mds;
604 struct lustre_handle conn = {0,};
605 struct obd_connect_data *data;
609 if (IS_ERR(mds->mds_osc_obd))
610 RETURN(PTR_ERR(mds->mds_osc_obd));
612 if (mds->mds_osc_obd)
615 mds->mds_osc_obd = class_name2obd(lov_name);
616 if (!mds->mds_osc_obd) {
617 CERROR("MDS cannot locate LOV %s\n", lov_name);
618 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
622 mutex_down(&obd->obd_dev_sem);
623 rc = mds_lov_read_objids(obd);
624 mutex_up(&obd->obd_dev_sem);
626 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
630 rc = obd_register_observer(mds->mds_osc_obd, obd);
632 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
637 mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
639 OBD_ALLOC(data, sizeof(*data));
642 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
643 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
644 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
645 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
646 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
648 #ifdef HAVE_LRU_RESIZE_SUPPORT
649 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
651 data->ocd_version = LUSTRE_VERSION_CODE;
652 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
653 /* send max bytes per rpc */
654 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
655 /* send the list of supported checksum types */
656 data->ocd_cksum_types = OBD_CKSUM_ALL;
657 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
658 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
659 OBD_FREE(data, sizeof(*data));
661 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
662 mds->mds_osc_obd = ERR_PTR(rc);
665 mds->mds_osc_exp = class_conn2export(&conn);
667 /* I want to see a callback happen when the OBD moves to a
668 * "For General Use" state, and that's when we'll call
669 * set_nextid(). The class driver can help us here, because
670 * it can use the obd_recovering flag to determine when the
671 * the OBD is full available. */
672 /* MDD device will care about that
673 if (!obd->obd_recovering)
674 rc = mds_postrecov(obd);
679 mds->mds_osc_exp = NULL;
680 mds->mds_osc_obd = ERR_PTR(rc);
684 int mds_lov_disconnect(struct obd_device *obd)
686 struct mds_obd *mds = &obd->u.mds;
690 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
691 obd_register_observer(mds->mds_osc_obd, NULL);
693 /* The actual disconnect of the mds_lov will be called from
694 * class_disconnect_exports from mds_lov_clean. So we have to
695 * ensure that class_cleanup doesn't fail due to the extra ref
696 * we're holding now. The mechanism to do that already exists -
697 * the obd_force flag. We'll drop the final ref to the
698 * mds_osc_exp in mds_cleanup. */
699 mds->mds_osc_obd->obd_force = 1;
705 struct mds_lov_sync_info {
706 struct obd_device *mlsi_obd; /* the lov device to sync */
707 struct obd_device *mlsi_watched; /* target osc */
708 __u32 mlsi_index; /* index of target */
711 static int mds_propagate_capa_keys(struct mds_obd *mds)
713 struct lustre_capa_key *key;
718 if (!mds->mds_capa_keys)
721 for (i = 0; i < 2; i++) {
722 key = &mds->mds_capa_keys[i];
723 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
725 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
726 KEY_CAPA_KEY, sizeof(*key), key, NULL);
728 DEBUG_CAPA_KEY(D_ERROR, key,
729 "propagate failed (rc = %d) for", rc);
737 /* We only sync one osc at a time, so that we don't have to hold
738 any kind of lock on the whole mds_lov_desc, which may change
739 (grow) as a result of mds_lov_add_ost. This also avoids any
740 kind of mismatch between the lov_desc and the mds_lov_desc,
741 which are not in lock-step during lov_add_obd */
742 static int __mds_lov_synchronize(void *data)
744 struct mds_lov_sync_info *mlsi = data;
745 struct obd_device *obd = mlsi->mlsi_obd;
746 struct obd_device *watched = mlsi->mlsi_watched;
747 struct mds_obd *mds = &obd->u.mds;
748 struct obd_uuid *uuid;
749 __u32 idx = mlsi->mlsi_index;
750 struct mds_group_info mgi;
751 struct llog_ctxt *ctxt;
759 uuid = &watched->u.cli.cl_target_uuid;
762 down_read(&mds->mds_notify_lock);
763 if (obd->obd_stopping || obd->obd_fail)
764 GOTO(out, rc = -ENODEV);
766 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
767 rc = mds_lov_update_mds(obd, watched, idx);
769 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
772 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
775 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
776 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
779 /* propagate capability keys */
780 rc = mds_propagate_capa_keys(mds);
784 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
786 GOTO(out, rc = -ENODEV);
788 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
789 rc = llog_connect(ctxt, NULL, NULL, uuid);
792 CERROR("%s failed at llog_origin_connect: %d\n",
793 obd_uuid2str(uuid), rc);
797 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
798 obd->obd_name, obd_uuid2str(uuid));
799 rc = mds_lov_clear_orphans(mds, uuid);
801 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
802 obd_uuid2str(uuid), rc);
806 if (obd->obd_upcall.onu_owner) {
808 * This is a hack for mds_notify->mdd_notify. When the mds obd
809 * in mdd is removed, This hack should be removed.
811 LASSERT(obd->obd_upcall.onu_upcall != NULL);
812 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
813 obd->obd_upcall.onu_owner);
817 up_read(&mds->mds_notify_lock);
819 /* Deactivate it for safety */
820 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
822 if (!obd->obd_stopping && mds->mds_osc_obd &&
823 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
824 obd_notify(mds->mds_osc_obd, watched,
825 OBD_NOTIFY_INACTIVE, NULL);
828 class_decref(obd, "mds_lov_synchronize", obd);
832 int mds_lov_synchronize(void *data)
834 struct mds_lov_sync_info *mlsi = data;
837 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
838 ptlrpc_daemonize(name);
840 RETURN(__mds_lov_synchronize(data));
843 int mds_lov_start_synchronize(struct obd_device *obd,
844 struct obd_device *watched,
845 void *data, int nonblock)
847 struct mds_lov_sync_info *mlsi;
849 struct obd_uuid *uuid;
853 uuid = &watched->u.cli.cl_target_uuid;
855 OBD_ALLOC(mlsi, sizeof(*mlsi));
860 mlsi->mlsi_obd = obd;
861 mlsi->mlsi_watched = watched;
862 mlsi->mlsi_index = *(__u32 *)data;
864 /* Although class_export_get(obd->obd_self_export) would lock
865 the MDS in place, since it's only a self-export
866 it doesn't lock the LOV in place. The LOV can be disconnected
867 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
868 Simply taking an export ref on the LOV doesn't help, because it's
869 still disconnected. Taking an obd reference insures that we don't
870 disconnect the LOV. This of course means a cleanup won't
871 finish for as long as the sync is blocking. */
872 class_incref(obd, "mds_lov_synchronize", obd);
875 /* Synchronize in the background */
876 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
877 CLONE_VM | CLONE_FILES);
879 CERROR("%s: error starting mds_lov_synchronize: %d\n",
881 class_decref(obd, "mds_lov_synchronize", obd);
883 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
884 "thread=%d\n", obd->obd_name,
885 mlsi->mlsi_index, rc);
889 rc = __mds_lov_synchronize((void *)mlsi);
895 int mds_notify(struct obd_device *obd, struct obd_device *watched,
896 enum obd_notify_event ev, void *data)
901 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
904 /* We only handle these: */
905 case OBD_NOTIFY_ACTIVE:
906 /* lov want one or more _active_ targets for work */
907 /* activate event should be pass lov idx as argument */
908 case OBD_NOTIFY_SYNC:
909 case OBD_NOTIFY_SYNC_NONBLOCK:
910 /* sync event should be pass lov idx as argument */
912 case OBD_NOTIFY_CONFIG:
917 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
918 CERROR("unexpected notification of %s %s!\n",
919 watched->obd_type->typ_name, watched->obd_name);
923 if (obd->obd_recovering) {
924 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
926 obd_uuid2str(&watched->u.cli.cl_target_uuid));
927 /* We still have to fix the lov descriptor for ost's added
928 after the mdt in the config log. They didn't make it into
930 rc = mds_lov_update_desc(obd, *(__u32 *)data,
931 &watched->u.cli.cl_target_uuid);
935 rc = mds_lov_start_synchronize(obd, watched, data,
936 !(ev == OBD_NOTIFY_SYNC));