1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
53 #include "mds_internal.h"
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 struct mds_obd *mds = &obd->u.mds;
60 CDEBUG(D_INFO, "dump from %s\n", label);
61 if (mds->mds_lov_page_dirty == NULL) {
62 CERROR("NULL bitmap!\n");
66 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
69 if (mds->mds_lov_page_array == NULL) {
70 CERROR("not init page array!\n");
74 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75 obd_id *data = mds->mds_lov_page_array[i];
80 for(j=0; j < OBJID_PER_PAGE(); j++) {
83 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
90 int mds_lov_init_objids(struct obd_device *obd)
92 struct mds_obd *mds = &obd->u.mds;
93 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
98 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
100 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101 if (mds->mds_lov_page_dirty == NULL)
105 OBD_ALLOC(mds->mds_lov_page_array, size);
106 if (mds->mds_lov_page_array == NULL)
107 GOTO(err_free_bitmap, rc = -ENOMEM);
109 /* open and test the lov objd file */
110 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
113 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114 GOTO(err_free, rc = PTR_ERR(file));
116 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118 file->f_dentry->d_inode->i_mode);
119 GOTO(err_open, rc = -ENOENT);
121 mds->mds_lov_objid_filp = file;
125 if (filp_close((struct file *)file, 0))
126 CERROR("can't close %s after error\n", LOV_OBJID);
128 OBD_FREE(mds->mds_lov_page_array, size);
130 FREE_BITMAP(mds->mds_lov_page_dirty);
135 void mds_lov_destroy_objids(struct obd_device *obd)
137 struct mds_obd *mds = &obd->u.mds;
141 if (mds->mds_lov_page_array != NULL) {
142 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143 obd_id *data = mds->mds_lov_page_array[i];
145 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
147 OBD_FREE(mds->mds_lov_page_array,
148 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
151 if (mds->mds_lov_objid_filp) {
152 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153 mds->mds_lov_objid_filp = NULL;
155 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
158 FREE_BITMAP(mds->mds_lov_page_dirty);
163 * currently exist two ways for know about ost count and max ost index.
164 * first - after ost is connected to mds and sync process finished
165 * second - get from lmm in recovery process, in case when mds not have configs,
166 * and ost isn't registered in mgs.
168 * \param mds pointer to mds structure
169 * \param index maxium ost index
171 * \retval -ENOMEM is not hame memory for new page
172 * \retval 0 is update passed
174 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
176 __u32 page = index / OBJID_PER_PAGE();
177 __u32 off = index % OBJID_PER_PAGE();
178 obd_id *data = mds->mds_lov_page_array[page];
181 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
185 mds->mds_lov_page_array[page] = data;
188 if (index > mds->mds_lov_objid_max_index) {
189 mds->mds_lov_objid_lastpage = page;
190 mds->mds_lov_objid_lastidx = off;
191 mds->mds_lov_objid_max_index = index;
194 /* workaround - New target not in objids file; increase mdsize */
195 /* ld_tgt_count is used as the max index everywhere, despite its name. */
196 if (data[off] == 0) {
200 mds->mds_lov_objid_count++;
201 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
202 mds->mds_lov_objid_count);
204 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
205 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
207 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
208 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
209 mds->mds_max_cookiesize);
216 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
218 struct lov_ost_data_v1 *data;
223 /* if we create file without objects - lmm is NULL */
227 switch (le32_to_cpu(lmm->lmm_magic)) {
229 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
230 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
233 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
234 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
237 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
242 mutex_down(&obd->obd_dev_sem);
243 for (j = 0; j < count; j++) {
244 __u32 i = le32_to_cpu(data[j].l_ost_idx);
245 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
250 mutex_up(&obd->obd_dev_sem);
254 EXPORT_SYMBOL(mds_lov_prepare_objids);
256 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
258 struct mds_obd *mds = &obd->u.mds;
260 struct lov_ost_data_v1 *obj;
264 /* if we create file without objects - lmm is NULL */
268 switch (le32_to_cpu(lmm->lmm_magic)) {
270 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
271 obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
274 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
275 obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
278 CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
282 for (j = 0; j < count; j++) {
283 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
284 obd_id id = le64_to_cpu(obj[j].l_object_id);
285 __u32 page = i / OBJID_PER_PAGE();
286 __u32 idx = i % OBJID_PER_PAGE();
289 data = mds->mds_lov_page_array[page];
291 CDEBUG(D_INODE,"update last object for ost %u"
292 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
293 if (id > data[idx]) {
295 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
301 EXPORT_SYMBOL(mds_lov_update_objids);
304 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
310 for(i = 0; i < count; i++) {
314 mds->mds_lov_objid_count++;
317 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
318 mds->mds_lov_objid_count);
320 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
321 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
323 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
324 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
330 static int mds_lov_read_objids(struct obd_device *obd)
332 struct mds_obd *mds = &obd->u.mds;
334 int i, rc, count = 0, page = 0;
338 /* Read everything in the file, even if our current lov desc
339 has fewer targets. Old targets not in the lov descriptor
340 during mds setup may still have valid objids. */
341 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
345 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
346 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
348 for (i = 0; i < page; i++) {
349 loff_t off_old = off;
351 LASSERT(mds->mds_lov_page_array[i] == NULL);
352 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
353 if (mds->mds_lov_page_array[i] == NULL)
354 GOTO(out, rc = -ENOMEM);
356 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
357 OBJID_PER_PAGE()*sizeof(obd_id), &off);
359 CERROR("Error reading objids %d\n", rc);
363 count += (off - off_old)/sizeof(obd_id);
364 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
365 CERROR("Can't update mds data\n");
366 GOTO(out, rc = -EIO);
372 mds->mds_lov_objid_lastpage = i;
373 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
375 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
376 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
378 mds_lov_dump_objids("read",obd);
383 int mds_lov_write_objids(struct obd_device *obd)
385 struct mds_obd *mds = &obd->u.mds;
389 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
392 mds_lov_dump_objids("write", obd);
394 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
395 obd_id *data = mds->mds_lov_page_array[i];
396 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
397 loff_t off = i * size;
399 LASSERT(data != NULL);
401 /* check for particaly filled last page */
402 if (i == mds->mds_lov_objid_lastpage)
403 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
405 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
409 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
416 EXPORT_SYMBOL(mds_lov_write_objids);
418 static int mds_lov_get_objid(struct obd_device * obd,
421 struct mds_obd *mds = &obd->u.mds;
428 page = idx / OBJID_PER_PAGE();
429 off = idx % OBJID_PER_PAGE();
430 data = mds->mds_lov_page_array[page];
431 if (data[off] == 0) {
432 /* We never read this lastid; ask the osc */
433 struct obd_id_info lastid;
434 __u32 size = sizeof(lastid);
437 lastid.data = &data[off];
438 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
439 KEY_LAST_ID, &size, &lastid, NULL);
443 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
449 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
453 struct obd_trans_info oti = {0};
454 struct lov_stripe_md *empty_ea = NULL;
457 LASSERT(mds->mds_lov_page_array != NULL);
459 /* This create will in fact either create or destroy: If the OST is
460 * missing objects below this ID, they will be created. If it finds
461 * objects above this ID, they will be removed. */
462 memset(&oa, 0, sizeof(oa));
463 oa.o_flags = OBD_FL_DELORPHAN;
464 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
465 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
466 if (ost_uuid != NULL)
467 oti.oti_ost_uuid = ost_uuid;
468 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
474 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
476 struct mds_obd *mds = &obd->u.mds;
478 struct obd_id_info info;
481 LASSERT(!obd->obd_recovering);
485 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
486 KEY_NEXT_ID, sizeof(info), &info, NULL);
488 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
494 /* Update the lov desc for a new size lov. */
495 static int mds_lov_update_desc(struct obd_device *obd, int idx,
496 struct obd_uuid *uuid)
498 struct mds_obd *mds = &obd->u.mds;
500 __u32 valsize = sizeof(mds->mds_lov_desc);
504 OBD_ALLOC(ld, sizeof(*ld));
508 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
513 /* Don't change the mds_lov_desc until the objids size matches the
515 mds->mds_lov_desc = *ld;
516 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
517 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
519 mutex_down(&obd->obd_dev_sem);
520 rc = mds_lov_update_max_ost(mds, idx);
521 mutex_up(&obd->obd_dev_sem);
526 /* If we added a target we have to reconnect the llogs */
527 /* We only _need_ to do this at first add (idx), or the first time
528 after recovery. However, it should now be safe to call anytime. */
529 rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
533 /*XXX this notifies the MDD until lov handling use old mds code */
534 if (obd->obd_upcall.onu_owner) {
535 LASSERT(obd->obd_upcall.onu_upcall != NULL);
536 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
537 obd->obd_upcall.onu_owner);
540 OBD_FREE(ld, sizeof(*ld));
544 /* Inform MDS about new/updated target */
545 static int mds_lov_update_mds(struct obd_device *obd,
546 struct obd_device *watched,
549 struct mds_obd *mds = &obd->u.mds;
557 /* Don't let anyone else mess with mds_lov_objids now */
558 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
562 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
563 idx, obd->obd_recovering, obd->obd_async_recov,
564 mds->mds_lov_desc.ld_tgt_count);
566 /* idx is set as data from lov_notify. */
567 if (obd->obd_recovering)
570 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
571 CERROR("index %d > count %d!\n", idx,
572 mds->mds_lov_desc.ld_tgt_count);
573 GOTO(out, rc = -EINVAL);
576 rc = mds_lov_get_objid(obd, idx);
580 page = idx / OBJID_PER_PAGE();
581 off = idx % OBJID_PER_PAGE();
582 data = mds->mds_lov_page_array[page];
584 /* We have read this lastid from disk; tell the osc.
585 Don't call this during recovery. */
586 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
588 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
589 /* Don't abort the rest of the sync */
592 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
599 /* update the LOV-OSC knowledge of the last used object id's */
600 int mds_lov_connect(struct obd_device *obd, char * lov_name)
602 struct mds_obd *mds = &obd->u.mds;
603 struct lustre_handle conn = {0,};
604 struct obd_connect_data *data;
608 if (IS_ERR(mds->mds_osc_obd))
609 RETURN(PTR_ERR(mds->mds_osc_obd));
611 if (mds->mds_osc_obd)
614 mds->mds_osc_obd = class_name2obd(lov_name);
615 if (!mds->mds_osc_obd) {
616 CERROR("MDS cannot locate LOV %s\n", lov_name);
617 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
621 mutex_down(&obd->obd_dev_sem);
622 rc = mds_lov_read_objids(obd);
623 mutex_up(&obd->obd_dev_sem);
625 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
629 rc = obd_register_observer(mds->mds_osc_obd, obd);
631 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
636 mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
638 OBD_ALLOC(data, sizeof(*data));
641 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
642 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
643 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
644 OBD_CONNECT_AT | OBD_CONNECT_CHANGE_QS;
645 #ifdef HAVE_LRU_RESIZE_SUPPORT
646 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
648 data->ocd_version = LUSTRE_VERSION_CODE;
649 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
650 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
651 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
652 OBD_FREE(data, sizeof(*data));
654 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
655 mds->mds_osc_obd = ERR_PTR(rc);
658 mds->mds_osc_exp = class_conn2export(&conn);
660 /* I want to see a callback happen when the OBD moves to a
661 * "For General Use" state, and that's when we'll call
662 * set_nextid(). The class driver can help us here, because
663 * it can use the obd_recovering flag to determine when the
664 * the OBD is full available. */
665 /* MDD device will care about that
666 if (!obd->obd_recovering)
667 rc = mds_postrecov(obd);
672 mds->mds_osc_exp = NULL;
673 mds->mds_osc_obd = ERR_PTR(rc);
677 int mds_lov_disconnect(struct obd_device *obd)
679 struct mds_obd *mds = &obd->u.mds;
683 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
684 obd_register_observer(mds->mds_osc_obd, NULL);
686 /* The actual disconnect of the mds_lov will be called from
687 * class_disconnect_exports from mds_lov_clean. So we have to
688 * ensure that class_cleanup doesn't fail due to the extra ref
689 * we're holding now. The mechanism to do that already exists -
690 * the obd_force flag. We'll drop the final ref to the
691 * mds_osc_exp in mds_cleanup. */
692 mds->mds_osc_obd->obd_force = 1;
698 struct mds_lov_sync_info {
699 struct obd_device *mlsi_obd; /* the lov device to sync */
700 struct obd_device *mlsi_watched; /* target osc */
701 __u32 mlsi_index; /* index of target */
704 static int mds_propagate_capa_keys(struct mds_obd *mds)
706 struct lustre_capa_key *key;
711 if (!mds->mds_capa_keys)
714 for (i = 0; i < 2; i++) {
715 key = &mds->mds_capa_keys[i];
716 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
718 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
719 KEY_CAPA_KEY, sizeof(*key), key, NULL);
721 DEBUG_CAPA_KEY(D_ERROR, key,
722 "propagate failed (rc = %d) for", rc);
730 /* We only sync one osc at a time, so that we don't have to hold
731 any kind of lock on the whole mds_lov_desc, which may change
732 (grow) as a result of mds_lov_add_ost. This also avoids any
733 kind of mismatch between the lov_desc and the mds_lov_desc,
734 which are not in lock-step during lov_add_obd */
735 static int __mds_lov_synchronize(void *data)
737 struct mds_lov_sync_info *mlsi = data;
738 struct obd_device *obd = mlsi->mlsi_obd;
739 struct obd_device *watched = mlsi->mlsi_watched;
740 struct mds_obd *mds = &obd->u.mds;
741 struct obd_uuid *uuid;
742 __u32 idx = mlsi->mlsi_index;
743 struct mds_group_info mgi;
744 struct llog_ctxt *ctxt;
752 uuid = &watched->u.cli.cl_target_uuid;
755 down_read(&mds->mds_notify_lock);
756 if (obd->obd_stopping || obd->obd_fail)
757 GOTO(out, rc = -ENODEV);
759 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
760 rc = mds_lov_update_mds(obd, watched, idx);
762 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
765 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
768 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
769 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
772 /* propagate capability keys */
773 rc = mds_propagate_capa_keys(mds);
777 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
779 GOTO(out, rc = -ENODEV);
781 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
782 rc = llog_connect(ctxt, NULL, NULL, uuid);
785 CERROR("%s failed at llog_origin_connect: %d\n",
786 obd_uuid2str(uuid), rc);
790 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
791 obd->obd_name, obd_uuid2str(uuid));
792 rc = mds_lov_clear_orphans(mds, uuid);
794 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
795 obd_uuid2str(uuid), rc);
799 if (obd->obd_upcall.onu_owner) {
801 * This is a hack for mds_notify->mdd_notify. When the mds obd
802 * in mdd is removed, This hack should be removed.
804 LASSERT(obd->obd_upcall.onu_upcall != NULL);
805 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
806 obd->obd_upcall.onu_owner);
810 up_read(&mds->mds_notify_lock);
812 /* Deactivate it for safety */
813 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
815 if (!obd->obd_stopping && mds->mds_osc_obd &&
816 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
817 obd_notify(mds->mds_osc_obd, watched,
818 OBD_NOTIFY_INACTIVE, NULL);
821 class_decref(obd, "mds_lov_synchronize", obd);
825 int mds_lov_synchronize(void *data)
827 struct mds_lov_sync_info *mlsi = data;
830 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
831 ptlrpc_daemonize(name);
833 RETURN(__mds_lov_synchronize(data));
836 int mds_lov_start_synchronize(struct obd_device *obd,
837 struct obd_device *watched,
838 void *data, int nonblock)
840 struct mds_lov_sync_info *mlsi;
842 struct obd_uuid *uuid;
846 uuid = &watched->u.cli.cl_target_uuid;
848 OBD_ALLOC(mlsi, sizeof(*mlsi));
853 mlsi->mlsi_obd = obd;
854 mlsi->mlsi_watched = watched;
855 mlsi->mlsi_index = *(__u32 *)data;
857 /* Although class_export_get(obd->obd_self_export) would lock
858 the MDS in place, since it's only a self-export
859 it doesn't lock the LOV in place. The LOV can be disconnected
860 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
861 Simply taking an export ref on the LOV doesn't help, because it's
862 still disconnected. Taking an obd reference insures that we don't
863 disconnect the LOV. This of course means a cleanup won't
864 finish for as long as the sync is blocking. */
865 class_incref(obd, "mds_lov_synchronize", obd);
868 /* Synchronize in the background */
869 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
870 CLONE_VM | CLONE_FILES);
872 CERROR("%s: error starting mds_lov_synchronize: %d\n",
874 class_decref(obd, "mds_lov_synchronize", obd);
876 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
877 "thread=%d\n", obd->obd_name,
878 mlsi->mlsi_index, rc);
882 rc = __mds_lov_synchronize((void *)mlsi);
888 int mds_notify(struct obd_device *obd, struct obd_device *watched,
889 enum obd_notify_event ev, void *data)
894 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
897 /* We only handle these: */
898 case OBD_NOTIFY_ACTIVE:
899 /* lov want one or more _active_ targets for work */
900 /* activate event should be pass lov idx as argument */
901 case OBD_NOTIFY_SYNC:
902 case OBD_NOTIFY_SYNC_NONBLOCK:
903 /* sync event should be pass lov idx as argument */
905 case OBD_NOTIFY_CONFIG:
910 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
911 CERROR("unexpected notification of %s %s!\n",
912 watched->obd_type->typ_name, watched->obd_name);
916 if (obd->obd_recovering) {
917 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
919 obd_uuid2str(&watched->u.cli.cl_target_uuid));
920 /* We still have to fix the lov descriptor for ost's added
921 after the mdt in the config log. They didn't make it into
923 rc = mds_lov_update_desc(obd, *(__u32 *)data,
924 &watched->u.cli.cl_target_uuid);
928 rc = mds_lov_start_synchronize(obd, watched, data,
929 !(ev == OBD_NOTIFY_SYNC));