1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
53 #include "mds_internal.h"
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 struct mds_obd *mds = &obd->u.mds;
60 CDEBUG(D_INFO, "dump from %s\n", label);
61 if (mds->mds_lov_page_dirty == NULL) {
62 CERROR("NULL bitmap!\n");
66 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 if (mds->mds_lov_page_array == NULL) {
70 CERROR("not init page array!\n");
74 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75 obd_id *data = mds->mds_lov_page_array[i];
80 for(j=0; j < OBJID_PER_PAGE(); j++) {
83 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
90 int mds_lov_init_objids(struct obd_device *obd)
92 struct mds_obd *mds = &obd->u.mds;
93 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
98 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101 if (mds->mds_lov_page_dirty == NULL)
105 OBD_ALLOC(mds->mds_lov_page_array, size);
106 if (mds->mds_lov_page_array == NULL)
107 GOTO(err_free_bitmap, rc = -ENOMEM);
109 /* open and test the lov objd file */
110 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
113 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114 GOTO(err_free, rc = PTR_ERR(file));
116 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118 file->f_dentry->d_inode->i_mode);
119 GOTO(err_open, rc = -ENOENT);
121 mds->mds_lov_objid_filp = file;
125 if (filp_close((struct file *)file, 0))
126 CERROR("can't close %s after error\n", LOV_OBJID);
128 OBD_FREE(mds->mds_lov_page_array, size);
130 FREE_BITMAP(mds->mds_lov_page_dirty);
135 void mds_lov_destroy_objids(struct obd_device *obd)
137 struct mds_obd *mds = &obd->u.mds;
141 if (mds->mds_lov_page_array != NULL) {
142 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143 obd_id *data = mds->mds_lov_page_array[i];
145 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147 OBD_FREE(mds->mds_lov_page_array,
148 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
151 if (mds->mds_lov_objid_filp) {
152 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153 mds->mds_lov_objid_filp = NULL;
155 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
158 FREE_BITMAP(mds->mds_lov_page_dirty);
163 * currently exist two ways for know about ost count and max ost index.
164 * first - after ost is connected to mds and sync process finished
165 * second - get from lmm in recovery process, in case when mds not have configs,
166 * and ost isn't registered in mgs.
168 * \param mds pointer to mds structure
169 * \param index maxium ost index
171 * \retval -ENOMEM is not hame memory for new page
172 * \retval 0 is update passed
174 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
176 __u32 page = index / OBJID_PER_PAGE();
177 __u32 off = index % OBJID_PER_PAGE();
178 obd_id *data = mds->mds_lov_page_array[page];
181 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
185 mds->mds_lov_page_array[page] = data;
188 if (index > mds->mds_lov_objid_max_index) {
189 mds->mds_lov_objid_lastpage = page;
190 mds->mds_lov_objid_lastidx = off;
191 mds->mds_lov_objid_max_index = index;
194 /* workaround - New target not in objids file; increase mdsize */
195 /* ld_tgt_count is used as the max index everywhere, despite its name. */
196 if (data[off] == 0) {
200 mds->mds_lov_objid_count++;
201 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
202 mds->mds_lov_objid_count);
204 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
205 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
207 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
208 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
209 mds->mds_max_cookiesize);
216 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
218 struct lov_ost_data_v1 *data;
223 /* if we create file without objects - lmm is NULL */
227 switch (le32_to_cpu(lmm->lmm_magic)) {
229 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
230 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
233 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
234 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
237 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
242 mutex_down(&obd->obd_dev_sem);
243 for (j = 0; j < count; j++) {
244 __u32 i = le32_to_cpu(data[j].l_ost_idx);
245 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
250 mutex_up(&obd->obd_dev_sem);
254 EXPORT_SYMBOL(mds_lov_prepare_objids);
256 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
258 struct mds_obd *mds = &obd->u.mds;
260 struct lov_ost_data_v1 *obj;
264 /* if we create file without objects - lmm is NULL */
268 switch (le32_to_cpu(lmm->lmm_magic)) {
270 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
271 obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
274 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
275 obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
278 CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
282 for (j = 0; j < count; j++) {
283 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
284 obd_id id = le64_to_cpu(obj[j].l_object_id);
285 __u32 page = i / OBJID_PER_PAGE();
286 __u32 idx = i % OBJID_PER_PAGE();
289 data = mds->mds_lov_page_array[page];
291 CDEBUG(D_INODE,"update last object for ost %u"
292 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
293 if (id > data[idx]) {
295 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
301 EXPORT_SYMBOL(mds_lov_update_objids);
304 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
310 for(i = 0; i < count; i++) {
314 mds->mds_lov_objid_count++;
317 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
318 mds->mds_lov_objid_count);
320 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
321 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
323 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
324 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
330 static int mds_lov_read_objids(struct obd_device *obd)
332 struct mds_obd *mds = &obd->u.mds;
334 int i, rc, count = 0, page = 0;
338 /* Read everything in the file, even if our current lov desc
339 has fewer targets. Old targets not in the lov descriptor
340 during mds setup may still have valid objids. */
341 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
345 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
346 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
348 for (i = 0; i < page; i++) {
349 loff_t off_old = off;
351 LASSERT(mds->mds_lov_page_array[i] == NULL);
352 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
353 if (mds->mds_lov_page_array[i] == NULL)
354 GOTO(out, rc = -ENOMEM);
356 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
357 OBJID_PER_PAGE()*sizeof(obd_id), &off);
359 CERROR("Error reading objids %d\n", rc);
363 count += (off - off_old)/sizeof(obd_id);
364 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
365 CERROR("Can't update mds data\n");
366 GOTO(out, rc = -EIO);
372 mds->mds_lov_objid_lastpage = i;
373 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
375 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
376 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
378 mds_lov_dump_objids("read",obd);
383 int mds_lov_write_objids(struct obd_device *obd)
385 struct mds_obd *mds = &obd->u.mds;
389 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
392 mds_lov_dump_objids("write", obd);
394 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
395 obd_id *data = mds->mds_lov_page_array[i];
396 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
397 loff_t off = i * size;
399 LASSERT(data != NULL);
401 /* check for particaly filled last page */
402 if (i == mds->mds_lov_objid_lastpage)
403 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
405 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
409 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
416 EXPORT_SYMBOL(mds_lov_write_objids);
418 static int mds_lov_get_objid(struct obd_device * obd,
421 struct mds_obd *mds = &obd->u.mds;
428 page = idx / OBJID_PER_PAGE();
429 off = idx % OBJID_PER_PAGE();
430 data = mds->mds_lov_page_array[page];
431 if (data[off] == 0) {
432 /* We never read this lastid; ask the osc */
433 struct obd_id_info lastid;
434 __u32 size = sizeof(lastid);
437 lastid.data = &data[off];
438 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
439 KEY_LAST_ID, &size, &lastid, NULL);
443 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
449 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
453 struct obd_trans_info oti = {0};
454 struct lov_stripe_md *empty_ea = NULL;
457 LASSERT(mds->mds_lov_page_array != NULL);
459 /* This create will in fact either create or destroy: If the OST is
460 * missing objects below this ID, they will be created. If it finds
461 * objects above this ID, they will be removed. */
462 memset(&oa, 0, sizeof(oa));
463 oa.o_flags = OBD_FL_DELORPHAN;
464 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
465 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
466 if (ost_uuid != NULL)
467 oti.oti_ost_uuid = ost_uuid;
468 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
474 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
476 struct mds_obd *mds = &obd->u.mds;
478 struct obd_id_info info;
481 LASSERT(!obd->obd_recovering);
485 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
486 KEY_NEXT_ID, sizeof(info), &info, NULL);
488 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
494 /* Update the lov desc for a new size lov. */
495 static int mds_lov_update_desc(struct obd_device *obd, int idx,
496 struct obd_uuid *uuid)
498 struct mds_obd *mds = &obd->u.mds;
500 __u32 valsize = sizeof(mds->mds_lov_desc);
504 OBD_ALLOC(ld, sizeof(*ld));
508 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
513 /* Don't change the mds_lov_desc until the objids size matches the
515 mds->mds_lov_desc = *ld;
516 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
517 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
519 mutex_down(&obd->obd_dev_sem);
520 rc = mds_lov_update_max_ost(mds, idx);
521 mutex_up(&obd->obd_dev_sem);
526 /* If we added a target we have to reconnect the llogs */
527 /* We only _need_ to do this at first add (idx), or the first time
528 after recovery. However, it should now be safe to call anytime. */
529 rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
533 /*XXX this notifies the MDD until lov handling use old mds code */
534 if (obd->obd_upcall.onu_owner) {
535 LASSERT(obd->obd_upcall.onu_upcall != NULL);
536 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
537 obd->obd_upcall.onu_owner);
540 OBD_FREE(ld, sizeof(*ld));
544 /* Inform MDS about new/updated target */
545 static int mds_lov_update_mds(struct obd_device *obd,
546 struct obd_device *watched,
549 struct mds_obd *mds = &obd->u.mds;
557 /* Don't let anyone else mess with mds_lov_objids now */
558 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
562 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
563 idx, obd->obd_recovering, obd->obd_async_recov,
564 mds->mds_lov_desc.ld_tgt_count);
566 /* idx is set as data from lov_notify. */
567 if (obd->obd_recovering)
570 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
571 CERROR("index %d > count %d!\n", idx,
572 mds->mds_lov_desc.ld_tgt_count);
573 GOTO(out, rc = -EINVAL);
576 rc = mds_lov_get_objid(obd, idx);
580 page = idx / OBJID_PER_PAGE();
581 off = idx % OBJID_PER_PAGE();
582 data = mds->mds_lov_page_array[page];
584 /* We have read this lastid from disk; tell the osc.
585 Don't call this during recovery. */
586 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
588 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
589 /* Don't abort the rest of the sync */
592 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
599 /* update the LOV-OSC knowledge of the last used object id's */
600 int mds_lov_connect(struct obd_device *obd, char * lov_name)
602 struct mds_obd *mds = &obd->u.mds;
603 struct lustre_handle conn = {0,};
604 struct obd_connect_data *data;
608 if (IS_ERR(mds->mds_osc_obd))
609 RETURN(PTR_ERR(mds->mds_osc_obd));
611 if (mds->mds_osc_obd)
614 mds->mds_osc_obd = class_name2obd(lov_name);
615 if (!mds->mds_osc_obd) {
616 CERROR("MDS cannot locate LOV %s\n", lov_name);
617 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
621 mutex_down(&obd->obd_dev_sem);
622 rc = mds_lov_read_objids(obd);
623 mutex_up(&obd->obd_dev_sem);
625 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
629 rc = obd_register_observer(mds->mds_osc_obd, obd);
631 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
636 OBD_ALLOC(data, sizeof(*data));
639 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
640 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
641 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
642 OBD_CONNECT_AT | OBD_CONNECT_CHANGE_QS;
643 #ifdef HAVE_LRU_RESIZE_SUPPORT
644 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
646 data->ocd_version = LUSTRE_VERSION_CODE;
647 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
648 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
649 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
650 OBD_FREE(data, sizeof(*data));
652 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
653 mds->mds_osc_obd = ERR_PTR(rc);
656 mds->mds_osc_exp = class_conn2export(&conn);
658 /* I want to see a callback happen when the OBD moves to a
659 * "For General Use" state, and that's when we'll call
660 * set_nextid(). The class driver can help us here, because
661 * it can use the obd_recovering flag to determine when the
662 * the OBD is full available. */
663 /* MDD device will care about that
664 if (!obd->obd_recovering)
665 rc = mds_postrecov(obd);
670 mds->mds_osc_exp = NULL;
671 mds->mds_osc_obd = ERR_PTR(rc);
675 int mds_lov_disconnect(struct obd_device *obd)
677 struct mds_obd *mds = &obd->u.mds;
681 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
682 obd_register_observer(mds->mds_osc_obd, NULL);
684 /* The actual disconnect of the mds_lov will be called from
685 * class_disconnect_exports from mds_lov_clean. So we have to
686 * ensure that class_cleanup doesn't fail due to the extra ref
687 * we're holding now. The mechanism to do that already exists -
688 * the obd_force flag. We'll drop the final ref to the
689 * mds_osc_exp in mds_cleanup. */
690 mds->mds_osc_obd->obd_force = 1;
696 struct mds_lov_sync_info {
697 struct obd_device *mlsi_obd; /* the lov device to sync */
698 struct obd_device *mlsi_watched; /* target osc */
699 __u32 mlsi_index; /* index of target */
702 static int mds_propagate_capa_keys(struct mds_obd *mds)
704 struct lustre_capa_key *key;
709 if (!mds->mds_capa_keys)
712 for (i = 0; i < 2; i++) {
713 key = &mds->mds_capa_keys[i];
714 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
716 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
717 KEY_CAPA_KEY, sizeof(*key), key, NULL);
719 DEBUG_CAPA_KEY(D_ERROR, key,
720 "propagate failed (rc = %d) for", rc);
728 /* We only sync one osc at a time, so that we don't have to hold
729 any kind of lock on the whole mds_lov_desc, which may change
730 (grow) as a result of mds_lov_add_ost. This also avoids any
731 kind of mismatch between the lov_desc and the mds_lov_desc,
732 which are not in lock-step during lov_add_obd */
733 static int __mds_lov_synchronize(void *data)
735 struct mds_lov_sync_info *mlsi = data;
736 struct obd_device *obd = mlsi->mlsi_obd;
737 struct obd_device *watched = mlsi->mlsi_watched;
738 struct mds_obd *mds = &obd->u.mds;
739 struct obd_uuid *uuid;
740 __u32 idx = mlsi->mlsi_index;
741 struct mds_group_info mgi;
742 struct llog_ctxt *ctxt;
750 uuid = &watched->u.cli.cl_target_uuid;
753 down_read(&mds->mds_notify_lock);
754 if (obd->obd_stopping || obd->obd_fail)
755 GOTO(out, rc = -ENODEV);
757 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
758 rc = mds_lov_update_mds(obd, watched, idx);
760 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
763 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
766 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
767 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
770 /* propagate capability keys */
771 rc = mds_propagate_capa_keys(mds);
775 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
777 GOTO(out, rc = -ENODEV);
779 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
780 rc = llog_connect(ctxt, NULL, NULL, uuid);
783 CERROR("%s failed at llog_origin_connect: %d\n",
784 obd_uuid2str(uuid), rc);
788 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
789 obd->obd_name, obd_uuid2str(uuid));
790 rc = mds_lov_clear_orphans(mds, uuid);
792 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
793 obd_uuid2str(uuid), rc);
797 if (obd->obd_upcall.onu_owner) {
799 * This is a hack for mds_notify->mdd_notify. When the mds obd
800 * in mdd is removed, This hack should be removed.
802 LASSERT(obd->obd_upcall.onu_upcall != NULL);
803 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
804 obd->obd_upcall.onu_owner);
808 up_read(&mds->mds_notify_lock);
810 /* Deactivate it for safety */
811 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
813 if (!obd->obd_stopping && mds->mds_osc_obd &&
814 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
815 obd_notify(mds->mds_osc_obd, watched,
816 OBD_NOTIFY_INACTIVE, NULL);
819 class_decref(obd, "mds_lov_synchronize", obd);
823 int mds_lov_synchronize(void *data)
825 struct mds_lov_sync_info *mlsi = data;
828 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
829 ptlrpc_daemonize(name);
831 RETURN(__mds_lov_synchronize(data));
834 int mds_lov_start_synchronize(struct obd_device *obd,
835 struct obd_device *watched,
836 void *data, int nonblock)
838 struct mds_lov_sync_info *mlsi;
840 struct obd_uuid *uuid;
844 uuid = &watched->u.cli.cl_target_uuid;
846 OBD_ALLOC(mlsi, sizeof(*mlsi));
851 mlsi->mlsi_obd = obd;
852 mlsi->mlsi_watched = watched;
853 mlsi->mlsi_index = *(__u32 *)data;
855 /* Although class_export_get(obd->obd_self_export) would lock
856 the MDS in place, since it's only a self-export
857 it doesn't lock the LOV in place. The LOV can be disconnected
858 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
859 Simply taking an export ref on the LOV doesn't help, because it's
860 still disconnected. Taking an obd reference insures that we don't
861 disconnect the LOV. This of course means a cleanup won't
862 finish for as long as the sync is blocking. */
863 class_incref(obd, "mds_lov_synchronize", obd);
866 /* Synchronize in the background */
867 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
868 CLONE_VM | CLONE_FILES);
870 CERROR("%s: error starting mds_lov_synchronize: %d\n",
872 class_decref(obd, "mds_lov_synchronize", obd);
874 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
875 "thread=%d\n", obd->obd_name,
876 mlsi->mlsi_index, rc);
880 rc = __mds_lov_synchronize((void *)mlsi);
886 int mds_notify(struct obd_device *obd, struct obd_device *watched,
887 enum obd_notify_event ev, void *data)
892 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
895 /* We only handle these: */
896 case OBD_NOTIFY_ACTIVE:
897 /* lov want one or more _active_ targets for work */
898 /* activate event should be pass lov idx as argument */
899 case OBD_NOTIFY_SYNC:
900 case OBD_NOTIFY_SYNC_NONBLOCK:
901 /* sync event should be pass lov idx as argument */
903 case OBD_NOTIFY_CONFIG:
908 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
909 CERROR("unexpected notification of %s %s!\n",
910 watched->obd_type->typ_name, watched->obd_name);
914 if (obd->obd_recovering) {
915 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
917 obd_uuid2str(&watched->u.cli.cl_target_uuid));
918 /* We still have to fix the lov descriptor for ost's added
919 after the mdt in the config log. They didn't make it into
921 rc = mds_lov_update_desc(obd, *(__u32 *)data,
922 &watched->u.cli.cl_target_uuid);
926 rc = mds_lov_start_synchronize(obd, watched, data,
927 !(ev == OBD_NOTIFY_SYNC));