1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i,
69 mds->mds_lov_page_dirty->data[i]);
71 if (mds->mds_lov_page_array == NULL) {
72 CERROR("not init page array!\n");
76 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77 obd_id *data = mds->mds_lov_page_array[i];
82 for(j=0; j < OBJID_PER_PAGE(); j++) {
85 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
93 int mds_lov_init_objids(struct obd_device *obd)
95 struct mds_obd *mds = &obd->u.mds;
96 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
103 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104 if (mds->mds_lov_page_dirty == NULL)
108 OBD_ALLOC(mds->mds_lov_page_array, size);
109 if (mds->mds_lov_page_array == NULL)
110 GOTO(err_free_bitmap, rc = -ENOMEM);
112 /* open and test the lov objd file */
113 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
116 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117 GOTO(err_free, rc = PTR_ERR(file));
119 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121 file->f_dentry->d_inode->i_mode);
122 GOTO(err_open, rc = -ENOENT);
124 mds->mds_lov_objid_filp = file;
128 if (filp_close((struct file *)file, 0))
129 CERROR("can't close %s after error\n", LOV_OBJID);
131 OBD_FREE(mds->mds_lov_page_array, size);
133 FREE_BITMAP(mds->mds_lov_page_dirty);
138 void mds_lov_destroy_objids(struct obd_device *obd)
140 struct mds_obd *mds = &obd->u.mds;
144 if (mds->mds_lov_page_array != NULL) {
145 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146 obd_id *data = mds->mds_lov_page_array[i];
148 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150 OBD_FREE(mds->mds_lov_page_array,
151 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
154 if (mds->mds_lov_objid_filp) {
155 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156 mds->mds_lov_objid_filp = NULL;
158 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
161 FREE_BITMAP(mds->mds_lov_page_dirty);
166 * currently exist two ways for know about ost count and max ost index.
167 * first - after ost is connected to mds and sync process finished
168 * second - get from lmm in recovery process, in case when mds not have configs,
169 * and ost isn't registered in mgs.
171 * \param mds pointer to mds structure
172 * \param index maxium ost index
174 * \retval -ENOMEM is not hame memory for new page
175 * \retval 0 is update passed
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 __u32 page = index / OBJID_PER_PAGE();
180 __u32 off = index % OBJID_PER_PAGE();
181 obd_id *data = mds->mds_lov_page_array[page];
184 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
188 mds->mds_lov_page_array[page] = data;
191 if (index > mds->mds_lov_objid_max_index) {
192 mds->mds_lov_objid_lastpage = page;
193 mds->mds_lov_objid_lastidx = off;
194 mds->mds_lov_objid_max_index = index;
197 /* workaround - New target not in objids file; increase mdsize */
198 /* ld_tgt_count is used as the max index everywhere, despite its name. */
199 if (data[off] == 0) {
203 mds->mds_lov_objid_count++;
204 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205 mds->mds_lov_objid_count);
207 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212 mds->mds_max_cookiesize);
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
221 struct lov_ost_data_v1 *data;
226 /* if we create file without objects - lmm is NULL */
230 switch (le32_to_cpu(lmm->lmm_magic)) {
232 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
236 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
240 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
245 mutex_down(&obd->obd_dev_sem);
246 for (j = 0; j < count; j++) {
247 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
253 mutex_up(&obd->obd_dev_sem);
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
259 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
261 struct mds_obd *mds = &obd->u.mds;
263 struct lov_ost_data_v1 *obj;
267 /* if we create file without objects - lmm is NULL */
271 switch (le32_to_cpu(lmm->lmm_magic)) {
273 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
274 obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
277 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
278 obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
281 CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
285 for (j = 0; j < count; j++) {
286 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
287 obd_id id = le64_to_cpu(obj[j].l_object_id);
288 __u32 page = i / OBJID_PER_PAGE();
289 __u32 idx = i % OBJID_PER_PAGE();
292 data = mds->mds_lov_page_array[page];
294 CDEBUG(D_INODE,"update last object for ost %u"
295 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
296 if (id > data[idx]) {
298 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
304 EXPORT_SYMBOL(mds_lov_update_objids);
306 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
312 for(i = 0; i < count; i++) {
316 mds->mds_lov_objid_count++;
319 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
320 mds->mds_lov_objid_count);
322 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
323 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
325 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
326 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
332 static int mds_lov_read_objids(struct obd_device *obd)
334 struct mds_obd *mds = &obd->u.mds;
336 int i, rc = 0, count = 0, page = 0;
340 /* Read everything in the file, even if our current lov desc
341 has fewer targets. Old targets not in the lov descriptor
342 during mds setup may still have valid objids. */
343 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
347 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
348 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
349 for (i = 0; i < page; i++) {
351 loff_t off_old = off;
353 LASSERT(mds->mds_lov_page_array[i] == NULL);
354 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
355 if (mds->mds_lov_page_array[i] == NULL)
356 GOTO(out, rc = -ENOMEM);
358 data = mds->mds_lov_page_array[i];
360 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
361 OBJID_PER_PAGE()*sizeof(obd_id), &off);
363 CERROR("Error reading objids %d\n", rc);
367 count += (off - off_old) / sizeof(obd_id);
368 if (mds_lov_update_from_read(mds, data, count)) {
369 CERROR("Can't update mds data\n");
370 GOTO(out, rc = -EIO);
376 mds->mds_lov_objid_lastpage = i;
377 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
379 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
380 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
382 mds_lov_dump_objids("read",obd);
387 int mds_lov_write_objids(struct obd_device *obd)
389 struct mds_obd *mds = &obd->u.mds;
393 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
396 mds_lov_dump_objids("write", obd);
398 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
399 obd_id *data = mds->mds_lov_page_array[i];
400 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
401 loff_t off = i * size;
403 LASSERT(data != NULL);
405 /* check for particaly filled last page */
406 if (i == mds->mds_lov_objid_lastpage)
407 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
409 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
410 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
414 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
421 EXPORT_SYMBOL(mds_lov_write_objids);
423 static int mds_lov_get_objid(struct obd_device * obd,
426 struct mds_obd *mds = &obd->u.mds;
433 page = idx / OBJID_PER_PAGE();
434 off = idx % OBJID_PER_PAGE();
435 data = mds->mds_lov_page_array[page];
437 /* We never read this lastid; ask the osc */
438 struct obd_id_info lastid;
439 __u32 size = sizeof(lastid);
442 lastid.data = &data[off];
443 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
444 KEY_LAST_ID, &size, &lastid, NULL);
448 /* workaround for clean filter */
452 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
454 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
455 idx, data, page, off, data[off]);
460 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
464 struct obd_trans_info oti = {0};
465 struct lov_stripe_md *empty_ea = NULL;
468 LASSERT(mds->mds_lov_page_array != NULL);
470 /* This create will in fact either create or destroy: If the OST is
471 * missing objects below this ID, they will be created. If it finds
472 * objects above this ID, they will be removed. */
473 memset(&oa, 0, sizeof(oa));
474 oa.o_flags = OBD_FL_DELORPHAN;
475 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
476 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
477 if (ost_uuid != NULL)
478 oti.oti_ost_uuid = ost_uuid;
480 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
486 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
488 struct mds_obd *mds = &obd->u.mds;
490 struct obd_id_info info;
493 LASSERT(!obd->obd_recovering);
497 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
498 KEY_NEXT_ID, sizeof(info), &info, NULL);
500 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
506 /* Update the lov desc for a new size lov. */
507 static int mds_lov_update_desc(struct obd_device *obd, int idx,
508 struct obd_uuid *uuid)
510 struct mds_obd *mds = &obd->u.mds;
512 __u32 valsize = sizeof(mds->mds_lov_desc);
516 OBD_ALLOC(ld, sizeof(*ld));
520 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
525 /* Don't change the mds_lov_desc until the objids size matches the
527 mds->mds_lov_desc = *ld;
528 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
529 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
531 mutex_down(&obd->obd_dev_sem);
532 rc = mds_lov_update_max_ost(mds, idx);
533 mutex_up(&obd->obd_dev_sem);
537 /* If we added a target we have to reconnect the llogs */
538 /* We only _need_ to do this at first add (idx), or the first time
539 after recovery. However, it should now be safe to call anytime. */
540 rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
544 /*XXX this notifies the MDD until lov handling use old mds code */
545 if (obd->obd_upcall.onu_owner) {
546 LASSERT(obd->obd_upcall.onu_upcall != NULL);
547 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
548 obd->obd_upcall.onu_owner);
551 OBD_FREE(ld, sizeof(*ld));
555 /* Inform MDS about new/updated target */
556 static int mds_lov_update_mds(struct obd_device *obd,
557 struct obd_device *watched,
560 struct mds_obd *mds = &obd->u.mds;
568 /* Don't let anyone else mess with mds_lov_objids now */
569 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
573 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
574 idx, obd->obd_recovering, obd->obd_async_recov,
575 mds->mds_lov_desc.ld_tgt_count);
577 /* idx is set as data from lov_notify. */
578 if (obd->obd_recovering)
581 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
582 CERROR("index %d > count %d!\n", idx,
583 mds->mds_lov_desc.ld_tgt_count);
584 GOTO(out, rc = -EINVAL);
587 rc = mds_lov_get_objid(obd, idx);
591 page = idx / OBJID_PER_PAGE();
592 off = idx % OBJID_PER_PAGE();
593 data = mds->mds_lov_page_array[page];
595 /* We have read this lastid from disk; tell the osc.
596 Don't call this during recovery. */
597 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
599 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
600 /* Don't abort the rest of the sync */
603 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
610 /* update the LOV-OSC knowledge of the last used object id's */
611 int mds_lov_connect(struct obd_device *obd, char * lov_name)
613 struct mds_obd *mds = &obd->u.mds;
614 struct obd_connect_data *data;
618 if (IS_ERR(mds->mds_osc_obd))
619 RETURN(PTR_ERR(mds->mds_osc_obd));
621 if (mds->mds_osc_obd)
624 mds->mds_osc_obd = class_name2obd(lov_name);
625 if (!mds->mds_osc_obd) {
626 CERROR("MDS cannot locate LOV %s\n", lov_name);
627 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
631 mutex_down(&obd->obd_dev_sem);
632 rc = mds_lov_read_objids(obd);
633 mutex_up(&obd->obd_dev_sem);
635 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
639 rc = obd_register_observer(mds->mds_osc_obd, obd);
641 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
646 mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
648 OBD_ALLOC(data, sizeof(*data));
651 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
652 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
653 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
654 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
655 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
656 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN;
657 #ifdef HAVE_LRU_RESIZE_SUPPORT
658 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
660 data->ocd_version = LUSTRE_VERSION_CODE;
661 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
662 /* send max bytes per rpc */
663 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
664 /* send the list of supported checksum types */
665 data->ocd_cksum_types = OBD_CKSUM_ALL;
666 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
667 rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
668 OBD_FREE(data, sizeof(*data));
670 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
671 mds->mds_osc_obd = ERR_PTR(rc);
675 /* I want to see a callback happen when the OBD moves to a
676 * "For General Use" state, and that's when we'll call
677 * set_nextid(). The class driver can help us here, because
678 * it can use the obd_recovering flag to determine when the
679 * the OBD is full available. */
680 /* MDD device will care about that
681 if (!obd->obd_recovering)
682 rc = mds_postrecov(obd);
687 mds->mds_osc_exp = NULL;
688 mds->mds_osc_obd = ERR_PTR(rc);
692 int mds_lov_disconnect(struct obd_device *obd)
694 struct mds_obd *mds = &obd->u.mds;
698 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
699 obd_register_observer(mds->mds_osc_obd, NULL);
701 /* The actual disconnect of the mds_lov will be called from
702 * class_disconnect_exports from mds_lov_clean. So we have to
703 * ensure that class_cleanup doesn't fail due to the extra ref
704 * we're holding now. The mechanism to do that already exists -
705 * the obd_force flag. We'll drop the final ref to the
706 * mds_osc_exp in mds_cleanup. */
707 mds->mds_osc_obd->obd_force = 1;
713 struct mds_lov_sync_info {
714 struct obd_device *mlsi_obd; /* the lov device to sync */
715 struct obd_device *mlsi_watched; /* target osc */
716 __u32 mlsi_index; /* index of target */
719 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
721 struct mds_capa_info info = { .uuid = uuid };
722 struct lustre_capa_key *key;
727 if (!mds->mds_capa_keys)
730 for (i = 0; i < 2; i++) {
731 key = &mds->mds_capa_keys[i];
732 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
735 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
736 KEY_CAPA_KEY, sizeof(info), &info, NULL);
738 DEBUG_CAPA_KEY(D_ERROR, key,
739 "propagate failed (rc = %d) for", rc);
747 /* We only sync one osc at a time, so that we don't have to hold
748 any kind of lock on the whole mds_lov_desc, which may change
749 (grow) as a result of mds_lov_add_ost. This also avoids any
750 kind of mismatch between the lov_desc and the mds_lov_desc,
751 which are not in lock-step during lov_add_obd */
752 static int __mds_lov_synchronize(void *data)
754 struct mds_lov_sync_info *mlsi = data;
755 struct obd_device *obd = mlsi->mlsi_obd;
756 struct obd_device *watched = mlsi->mlsi_watched;
757 struct mds_obd *mds = &obd->u.mds;
758 struct obd_uuid *uuid;
759 __u32 idx = mlsi->mlsi_index;
760 struct mds_group_info mgi;
761 struct llog_ctxt *ctxt;
769 uuid = &watched->u.cli.cl_target_uuid;
772 down_read(&mds->mds_notify_lock);
773 if (obd->obd_stopping || obd->obd_fail)
774 GOTO(out, rc = -ENODEV);
776 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
777 rc = mds_lov_update_mds(obd, watched, idx);
779 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
782 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
785 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
786 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
789 /* propagate capability keys */
790 rc = mds_propagate_capa_keys(mds, uuid);
794 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
796 GOTO(out, rc = -ENODEV);
798 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
799 rc = llog_connect(ctxt, NULL, NULL, uuid);
802 CERROR("%s failed at llog_origin_connect: %d\n",
803 obd_uuid2str(uuid), rc);
807 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
808 obd->obd_name, obd_uuid2str(uuid));
809 rc = mds_lov_clear_orphans(mds, uuid);
811 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
812 obd_uuid2str(uuid), rc);
816 #ifdef HAVE_QUOTA_SUPPORT
817 if (obd->obd_upcall.onu_owner) {
819 * This is a hack for mds_notify->mdd_notify. When the mds obd
820 * in mdd is removed, This hack should be removed.
822 LASSERT(obd->obd_upcall.onu_upcall != NULL);
823 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
824 obd->obd_upcall.onu_owner);
829 up_read(&mds->mds_notify_lock);
831 /* Deactivate it for safety */
832 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
834 if (!obd->obd_stopping && mds->mds_osc_obd &&
835 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
836 obd_notify(mds->mds_osc_obd, watched,
837 OBD_NOTIFY_INACTIVE, NULL);
840 class_decref(obd, "mds_lov_synchronize", obd);
844 int mds_lov_synchronize(void *data)
846 struct mds_lov_sync_info *mlsi = data;
849 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
850 ptlrpc_daemonize(name);
852 RETURN(__mds_lov_synchronize(data));
855 int mds_lov_start_synchronize(struct obd_device *obd,
856 struct obd_device *watched,
857 void *data, int nonblock)
859 struct mds_lov_sync_info *mlsi;
861 struct obd_uuid *uuid;
865 uuid = &watched->u.cli.cl_target_uuid;
867 OBD_ALLOC(mlsi, sizeof(*mlsi));
872 mlsi->mlsi_obd = obd;
873 mlsi->mlsi_watched = watched;
874 mlsi->mlsi_index = *(__u32 *)data;
876 /* Although class_export_get(obd->obd_self_export) would lock
877 the MDS in place, since it's only a self-export
878 it doesn't lock the LOV in place. The LOV can be disconnected
879 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
880 Simply taking an export ref on the LOV doesn't help, because it's
881 still disconnected. Taking an obd reference insures that we don't
882 disconnect the LOV. This of course means a cleanup won't
883 finish for as long as the sync is blocking. */
884 class_incref(obd, "mds_lov_synchronize", obd);
887 /* Synchronize in the background */
888 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
889 CLONE_VM | CLONE_FILES);
891 CERROR("%s: error starting mds_lov_synchronize: %d\n",
893 class_decref(obd, "mds_lov_synchronize", obd);
895 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
896 "thread=%d\n", obd->obd_name,
897 mlsi->mlsi_index, rc);
901 rc = __mds_lov_synchronize((void *)mlsi);
907 int mds_notify(struct obd_device *obd, struct obd_device *watched,
908 enum obd_notify_event ev, void *data)
913 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
916 /* We only handle these: */
917 case OBD_NOTIFY_ACTIVE:
918 /* lov want one or more _active_ targets for work */
919 /* activate event should be pass lov idx as argument */
920 case OBD_NOTIFY_SYNC:
921 case OBD_NOTIFY_SYNC_NONBLOCK:
922 /* sync event should be pass lov idx as argument */
928 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
929 CERROR("unexpected notification of %s %s!\n",
930 watched->obd_type->typ_name, watched->obd_name);
934 if (obd->obd_recovering) {
935 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
937 obd_uuid2str(&watched->u.cli.cl_target_uuid));
938 /* We still have to fix the lov descriptor for ost's added
939 after the mdt in the config log. They didn't make it into
941 rc = mds_lov_update_desc(obd, *(__u32 *)data,
942 &watched->u.cli.cl_target_uuid);
946 rc = mds_lov_start_synchronize(obd, watched, data,
947 !(ev == OBD_NOTIFY_SYNC));