1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/mds/mds_lov.c
40 * Lustre Metadata Server (mds) handling of striped file data
42 * Author: Peter Braam <braam@clusterfs.com>
45 #define DEBUG_SUBSYSTEM S_MDS
47 #include <linux/module.h>
48 #include <lustre_mds.h>
49 #include <lustre/lustre_idl.h>
50 #include <obd_class.h>
52 #include <lustre_lib.h>
53 #include <lustre_fsfilt.h>
54 #include <obd_cksum.h>
56 #include "mds_internal.h"
58 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
60 struct mds_obd *mds = &obd->u.mds;
63 if ((libcfs_debug & D_INFO) == 0)
66 CDEBUG(D_INFO, "dump from %s\n", label);
67 if (mds->mds_lov_page_dirty == NULL) {
68 CERROR("NULL bitmap!\n");
72 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
73 CDEBUG(D_INFO, "%u - %lx\n", i,
74 mds->mds_lov_page_dirty->data[i]);
76 if (mds->mds_lov_page_array == NULL) {
77 CERROR("not init page array!\n");
81 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
82 obd_id *data = mds->mds_lov_page_array[i];
87 for(j=0; j < OBJID_PER_PAGE(); j++) {
90 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
98 int mds_lov_init_objids(struct obd_device *obd)
100 struct mds_obd *mds = &obd->u.mds;
101 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
106 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
108 mds->mds_lov_page_dirty =
109 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
110 if (mds->mds_lov_page_dirty == NULL)
114 OBD_ALLOC(mds->mds_lov_page_array, size);
115 if (mds->mds_lov_page_array == NULL)
116 GOTO(err_free_bitmap, rc = -ENOMEM);
118 /* open and test the lov objd file */
119 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
122 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
123 GOTO(err_free, rc = PTR_ERR(file));
125 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
126 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
127 file->f_dentry->d_inode->i_mode);
128 GOTO(err_open, rc = -ENOENT);
130 mds->mds_lov_objid_filp = file;
134 if (filp_close((struct file *)file, 0))
135 CERROR("can't close %s after error\n", LOV_OBJID);
137 OBD_FREE(mds->mds_lov_page_array, size);
139 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
144 void mds_lov_destroy_objids(struct obd_device *obd)
146 struct mds_obd *mds = &obd->u.mds;
150 if (mds->mds_lov_page_array != NULL) {
151 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
152 obd_id *data = mds->mds_lov_page_array[i];
154 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
156 OBD_FREE(mds->mds_lov_page_array,
157 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
160 if (mds->mds_lov_objid_filp) {
161 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
162 mds->mds_lov_objid_filp = NULL;
164 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
167 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
172 * currently exist two ways for know about ost count and max ost index.
173 * first - after ost is connected to mds and sync process finished
174 * second - get from lmm in recovery process, in case when mds not have configs,
175 * and ost isn't registered in mgs.
177 * \param mds pointer to mds structure
178 * \param index maxium ost index
180 * \retval -ENOMEM is not hame memory for new page
181 * \retval 0 is update passed
183 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
185 __u32 page = index / OBJID_PER_PAGE();
186 __u32 off = index % OBJID_PER_PAGE();
187 obd_id *data = mds->mds_lov_page_array[page];
190 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
194 mds->mds_lov_page_array[page] = data;
197 if (index > mds->mds_lov_objid_max_index) {
198 mds->mds_lov_objid_lastpage = page;
199 mds->mds_lov_objid_lastidx = off;
200 mds->mds_lov_objid_max_index = index;
203 /* workaround - New target not in objids file; increase mdsize */
204 /* ld_tgt_count is used as the max index everywhere, despite its name. */
205 if (data[off] == 0) {
209 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
211 mds->mds_lov_objid_count++;
212 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
213 mds->mds_lov_objid_count);
215 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
216 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
218 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
219 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
220 mds->mds_max_cookiesize);
227 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
229 __u32 page = index / OBJID_PER_PAGE();
230 __u32 off = index % OBJID_PER_PAGE();
231 obd_id *data = mds->mds_lov_page_array[page];
233 return (data[off] > 0);
236 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
238 struct lov_ost_data_v1 *data;
243 /* if we create file without objects - lmm is NULL */
247 switch (le32_to_cpu(lmm->lmm_magic)) {
249 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
250 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
253 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
254 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
257 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
262 cfs_mutex_lock(&obd->obd_dev_mutex);
263 for (j = 0; j < count; j++) {
264 __u32 i = le32_to_cpu(data[j].l_ost_idx);
265 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
270 cfs_mutex_unlock(&obd->obd_dev_mutex);
274 EXPORT_SYMBOL(mds_lov_prepare_objids);
277 * write llog orphan record about lost ost object,
278 * Special lsm is allocated with single stripe, caller should deallocated it
281 static int mds_log_lost_precreated(struct obd_device *obd,
282 struct lov_stripe_md **lsmp, __u16 *stripes,
283 obd_id id, obd_count count, int idx)
285 struct lov_stripe_md *lsm = *lsmp;
290 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
293 /* need only one stripe, save old value */
294 *stripes = lsm->lsm_stripe_count;
295 lsm->lsm_stripe_count = 1;
299 lsm->lsm_oinfo[0]->loi_id = id;
300 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
301 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
303 rc = mds_log_op_orphan(obd, lsm, count);
307 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
309 struct mds_obd *mds = &obd->u.mds;
311 struct lov_ost_data_v1 *obj;
312 struct lov_stripe_md *lsm = NULL;
317 /* if we create file without objects - lmm is NULL */
321 switch (le32_to_cpu(lmm->lmm_magic)) {
323 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
324 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
327 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
328 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
331 CERROR("Unknow lmm type %X !\n",
332 le32_to_cpu(lmm->lmm_magic));
336 for (j = 0; j < count; j++) {
337 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
338 obd_id id = le64_to_cpu(obj[j].l_object_id);
339 __u32 page = i / OBJID_PER_PAGE();
340 __u32 idx = i % OBJID_PER_PAGE();
343 data = mds->mds_lov_page_array[page];
345 CDEBUG(D_INODE,"update last object for ost %u"
346 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
347 if (id > data[idx]) {
348 int lost = id - data[idx] - 1;
349 /* we might have lost precreated objects due to VBR */
350 if (lost > 0 && obd->obd_recovering) {
351 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
352 if (!obd->obd_version_recov)
353 CERROR("Unexpected gap in objids\n");
354 /* lsm is allocated if NULL */
355 mds_log_lost_precreated(obd, &lsm, &stripes,
356 data[idx]+1, lost, i);
359 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
363 /* restore stripes number */
364 lsm->lsm_stripe_count = stripes;
365 obd_free_memmd(mds->mds_lov_exp, &lsm);
370 EXPORT_SYMBOL(mds_lov_update_objids);
372 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
375 __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
378 for (i = 0; i < count; i++) {
382 mds->mds_lov_objid_count++;
385 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
386 mds->mds_lov_objid_count);
388 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
389 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
391 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
392 "%d/%d\n", stripes, mds->mds_max_mdsize,
393 mds->mds_max_cookiesize);
399 static int mds_lov_read_objids(struct obd_device *obd)
401 struct mds_obd *mds = &obd->u.mds;
403 int i, rc = 0, count = 0, page = 0;
407 /* Read everything in the file, even if our current lov desc
408 has fewer targets. Old targets not in the lov descriptor
409 during mds setup may still have valid objids. */
410 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
414 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
415 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
416 for (i = 0; i < page; i++) {
418 loff_t off_old = off;
420 LASSERT(mds->mds_lov_page_array[i] == NULL);
421 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
422 if (mds->mds_lov_page_array[i] == NULL)
423 GOTO(out, rc = -ENOMEM);
425 data = mds->mds_lov_page_array[i];
427 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
428 MDS_LOV_ALLOC_SIZE, &off);
430 CERROR("Error reading objids %d\n", rc);
433 if (off == off_old) /* hole is read */
434 off += MDS_LOV_ALLOC_SIZE;
436 count = (off - off_old) / sizeof(obd_id);
437 if (mds_lov_update_from_read(mds, data, count)) {
438 CERROR("Can't update mds data\n");
439 GOTO(out, rc = -EIO);
442 mds->mds_lov_objid_lastpage = page - 1;
443 mds->mds_lov_objid_lastidx = count - 1;
445 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
446 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
448 mds_lov_dump_objids("read",obd);
453 int mds_lov_write_objids(struct obd_device *obd)
455 struct mds_obd *mds = &obd->u.mds;
459 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
462 mds_lov_dump_objids("write", obd);
464 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
465 obd_id *data = mds->mds_lov_page_array[i];
466 unsigned int size = MDS_LOV_ALLOC_SIZE;
467 loff_t off = i * size;
469 LASSERT(data != NULL);
471 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
474 /* check for particaly filled last page */
475 if (i == mds->mds_lov_objid_lastpage)
476 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
478 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
479 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
482 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
491 EXPORT_SYMBOL(mds_lov_write_objids);
493 static int mds_lov_get_objid(struct obd_device * obd,
496 struct mds_obd *mds = &obd->u.mds;
497 struct obd_export *lov_exp = mds->mds_lov_exp;
505 page = idx / OBJID_PER_PAGE();
506 off = idx % OBJID_PER_PAGE();
507 data = mds->mds_lov_page_array[page];
510 /* We never read this lastid; ask the osc */
511 struct obd_id_info lastid;
513 size = sizeof(lastid);
515 lastid.data = &data[off];
516 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
517 &size, &lastid, NULL);
521 /* workaround for clean filter */
525 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
527 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
528 idx, data, page, off, data[off]);
533 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
536 struct obdo oa = { 0 };
537 struct obd_trans_info oti = {0};
538 struct lov_stripe_md *empty_ea = NULL;
541 LASSERT(mds->mds_lov_page_array != NULL);
543 /* This create will in fact either create or destroy: If the OST is
544 * missing objects below this ID, they will be created. If it finds
545 * objects above this ID, they will be removed. */
546 memset(&oa, 0, sizeof(oa));
547 oa.o_flags = OBD_FL_DELORPHAN;
548 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
549 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
550 if (ost_uuid != NULL)
551 oti.oti_ost_uuid = ost_uuid;
553 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
559 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
561 struct mds_obd *mds = &obd->u.mds;
563 struct obd_id_info info;
566 LASSERT(!obd->obd_recovering);
570 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
571 KEY_NEXT_ID, sizeof(info), &info, NULL);
573 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
579 /* Update the lov desc for a new size lov. */
580 static int mds_lov_update_desc(struct obd_device *obd, int idx,
581 struct obd_uuid *uuid)
583 struct mds_obd *mds = &obd->u.mds;
585 __u32 valsize = sizeof(mds->mds_lov_desc);
589 OBD_ALLOC(ld, sizeof(*ld));
593 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
598 /* Don't change the mds_lov_desc until the objids size matches the
600 mds->mds_lov_desc = *ld;
601 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
602 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
604 cfs_mutex_lock(&obd->obd_dev_mutex);
605 rc = mds_lov_update_max_ost(mds, idx);
606 cfs_mutex_unlock(&obd->obd_dev_mutex);
610 /* If we added a target we have to reconnect the llogs */
611 /* We only _need_ to do this at first add (idx), or the first time
612 after recovery. However, it should now be safe to call anytime. */
613 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
618 OBD_FREE(ld, sizeof(*ld));
622 /* Inform MDS about new/updated target */
623 static int mds_lov_update_mds(struct obd_device *obd,
624 struct obd_device *watched,
627 struct mds_obd *mds = &obd->u.mds;
634 LASSERT(mds_lov_objinit(mds, idx));
636 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
637 idx, obd->obd_recovering, obd->obd_async_recov,
638 mds->mds_lov_desc.ld_tgt_count);
640 /* idx is set as data from lov_notify. */
641 if (obd->obd_recovering)
644 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
645 CERROR("index %d > count %d!\n", idx,
646 mds->mds_lov_desc.ld_tgt_count);
647 GOTO(out, rc = -EINVAL);
650 rc = mds_lov_get_objid(obd, idx);
654 page = idx / OBJID_PER_PAGE();
655 off = idx % OBJID_PER_PAGE();
656 data = mds->mds_lov_page_array[page];
658 /* We have read this lastid from disk; tell the osc.
659 Don't call this during recovery. */
660 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
662 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
663 /* Don't abort the rest of the sync */
666 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
673 /* update the LOV-OSC knowledge of the last used object id's */
674 int mds_lov_connect(struct obd_device *obd, char * lov_name)
676 struct mds_obd *mds = &obd->u.mds;
677 struct obd_connect_data *data;
681 if (IS_ERR(mds->mds_lov_obd))
682 RETURN(PTR_ERR(mds->mds_lov_obd));
684 if (mds->mds_lov_obd)
687 mds->mds_lov_obd = class_name2obd(lov_name);
688 if (!mds->mds_lov_obd) {
689 CERROR("MDS cannot locate LOV %s\n", lov_name);
690 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
694 cfs_mutex_lock(&obd->obd_dev_mutex);
695 rc = mds_lov_read_objids(obd);
696 cfs_mutex_unlock(&obd->obd_dev_mutex);
698 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
702 rc = obd_register_observer(mds->mds_lov_obd, obd);
704 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
709 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
711 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
713 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
715 OBD_ALLOC(data, sizeof(*data));
717 GOTO(err_exit, rc = -ENOMEM);
719 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
720 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
721 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FULL20 |
722 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
723 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
724 OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
725 #ifdef HAVE_LRU_RESIZE_SUPPORT
726 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
728 data->ocd_version = LUSTRE_VERSION_CODE;
729 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
730 data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
732 /* send max bytes per rpc */
733 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
734 /* send the list of supported checksum types */
735 data->ocd_cksum_types = cksum_types_supported();
736 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
737 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
738 OBD_FREE(data, sizeof(*data));
740 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
741 mds->mds_lov_obd = ERR_PTR(rc);
745 /* I want to see a callback happen when the OBD moves to a
746 * "For General Use" state, and that's when we'll call
747 * set_nextid(). The class driver can help us here, because
748 * it can use the obd_recovering flag to determine when the
749 * the OBD is full available. */
750 /* MDD device will care about that
751 if (!obd->obd_recovering)
752 rc = mds_postrecov(obd);
757 mds->mds_lov_exp = NULL;
758 mds->mds_lov_obd = ERR_PTR(rc);
762 int mds_lov_disconnect(struct obd_device *obd)
764 struct mds_obd *mds = &obd->u.mds;
768 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
769 obd_register_observer(mds->mds_lov_obd, NULL);
771 /* The actual disconnect of the mds_lov will be called from
772 * class_disconnect_exports from mds_lov_clean. So we have to
773 * ensure that class_cleanup doesn't fail due to the extra ref
774 * we're holding now. The mechanism to do that already exists -
775 * the obd_force flag. We'll drop the final ref to the
776 * mds_lov_exp in mds_cleanup. */
777 mds->mds_lov_obd->obd_force = 1;
783 struct mds_lov_sync_info {
784 struct obd_device *mlsi_obd; /* the lov device to sync */
785 struct obd_device *mlsi_watched; /* target osc */
786 __u32 mlsi_index; /* index of target */
789 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
791 struct mds_capa_info info = { .uuid = uuid };
792 struct lustre_capa_key *key;
797 if (!mds->mds_capa_keys)
800 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
801 for (i = 0; i < 2; i++) {
802 key = &mds->mds_capa_keys[i];
803 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
806 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
807 KEY_CAPA_KEY, sizeof(info), &info, NULL);
809 DEBUG_CAPA_KEY(D_ERROR, key,
810 "propagate failed (rc = %d) for", rc);
818 /* We only sync one osc at a time, so that we don't have to hold
819 any kind of lock on the whole mds_lov_desc, which may change
820 (grow) as a result of mds_lov_add_ost. This also avoids any
821 kind of mismatch between the lov_desc and the mds_lov_desc,
822 which are not in lock-step during lov_add_obd */
823 static int __mds_lov_synchronize(void *data)
825 struct mds_lov_sync_info *mlsi = data;
826 struct obd_device *obd = mlsi->mlsi_obd;
827 struct obd_device *watched = mlsi->mlsi_watched;
828 struct mds_obd *mds = &obd->u.mds;
829 struct obd_uuid *uuid;
830 __u32 idx = mlsi->mlsi_index;
831 struct mds_group_info mgi;
832 struct llog_ctxt *ctxt;
840 uuid = &watched->u.cli.cl_target_uuid;
843 cfs_down_read(&mds->mds_notify_lock);
844 if (obd->obd_stopping || obd->obd_fail)
845 GOTO(out, rc = -ENODEV);
847 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
848 rc = mds_lov_update_mds(obd, watched, idx);
850 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
853 mgi.group = mdt_to_obd_objseq(mds->mds_id);
856 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
857 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
860 /* propagate capability keys */
861 rc = mds_propagate_capa_keys(mds, uuid);
865 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
867 GOTO(out, rc = -ENODEV);
869 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
870 rc = llog_connect(ctxt, NULL, NULL, uuid);
873 CERROR("%s failed at llog_origin_connect: %d\n",
874 obd_uuid2str(uuid), rc);
878 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
879 obd->obd_name, obd_uuid2str(uuid));
881 rc = mds_lov_clear_orphans(mds, uuid);
883 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
884 obd_uuid2str(uuid), rc);
888 #ifdef HAVE_QUOTA_SUPPORT
889 if (obd->obd_upcall.onu_owner) {
891 * This is a hack for mds_notify->mdd_notify. When the mds obd
892 * in mdd is removed, This hack should be removed.
894 LASSERT(obd->obd_upcall.onu_upcall != NULL);
895 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
896 obd->obd_upcall.onu_owner,NULL);
901 cfs_up_read(&mds->mds_notify_lock);
903 /* Deactivate it for safety */
904 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
906 if (!obd->obd_stopping && mds->mds_lov_obd &&
907 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
908 obd_notify(mds->mds_lov_obd, watched,
909 OBD_NOTIFY_INACTIVE, NULL);
912 class_decref(obd, "mds_lov_synchronize", obd);
916 int mds_lov_synchronize(void *data)
918 struct mds_lov_sync_info *mlsi = data;
921 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
922 cfs_daemonize_ctxt(name);
924 RETURN(__mds_lov_synchronize(data));
927 int mds_lov_start_synchronize(struct obd_device *obd,
928 struct obd_device *watched,
929 void *data, enum obd_notify_event ev)
931 struct mds_lov_sync_info *mlsi;
933 struct obd_uuid *uuid;
937 uuid = &watched->u.cli.cl_target_uuid;
939 OBD_ALLOC(mlsi, sizeof(*mlsi));
944 mlsi->mlsi_obd = obd;
945 mlsi->mlsi_watched = watched;
946 mlsi->mlsi_index = *(__u32 *)data;
948 /* Although class_export_get(obd->obd_self_export) would lock
949 the MDS in place, since it's only a self-export
950 it doesn't lock the LOV in place. The LOV can be disconnected
951 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
952 Simply taking an export ref on the LOV doesn't help, because it's
953 still disconnected. Taking an obd reference insures that we don't
954 disconnect the LOV. This of course means a cleanup won't
955 finish for as long as the sync is blocking. */
956 class_incref(obd, "mds_lov_synchronize", obd);
958 if (ev != OBD_NOTIFY_SYNC) {
959 /* Synchronize in the background */
960 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
963 CERROR("%s: error starting mds_lov_synchronize: %d\n",
965 class_decref(obd, "mds_lov_synchronize", obd);
967 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
968 "thread=%d\n", obd->obd_name,
969 mlsi->mlsi_index, rc);
973 rc = __mds_lov_synchronize((void *)mlsi);
979 int mds_notify(struct obd_device *obd, struct obd_device *watched,
980 enum obd_notify_event ev, void *data)
982 struct mds_obd *mds = &obd->u.mds;
986 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
988 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
989 CERROR("unexpected notification of %s %s!\n",
990 watched->obd_type->typ_name, watched->obd_name);
994 /*XXX this notifies the MDD until lov handling use old mds code
997 if (obd->obd_upcall.onu_owner) {
998 LASSERT(obd->obd_upcall.onu_upcall != NULL);
999 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
1000 obd->obd_upcall.onu_owner,
1001 &mds->mds_obt.obt_mount_count);
1005 /* We only handle these: */
1006 case OBD_NOTIFY_CREATE:
1007 CWARN("MDS %s: add target %s\n",obd->obd_name,
1008 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1009 /* We still have to fix the lov descriptor for ost's */
1011 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1012 &watched->u.cli.cl_target_uuid);
1014 case OBD_NOTIFY_ACTIVE:
1015 /* lov want one or more _active_ targets for work */
1016 /* activate event should be pass lov idx as argument */
1017 case OBD_NOTIFY_SYNC:
1018 case OBD_NOTIFY_SYNC_NONBLOCK:
1019 /* sync event should be pass lov idx as argument */
1025 if (obd->obd_recovering) {
1026 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1028 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1029 /* We still have to fix the lov descriptor for ost's added
1030 after the mdt in the config log. They didn't make it into
1032 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1033 &watched->u.cli.cl_target_uuid);
1035 rc = mds_lov_start_synchronize(obd, watched, data, ev);