4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 if ((libcfs_debug & D_INFO) == 0)
64 CDEBUG(D_INFO, "dump from %s\n", label);
65 if (mds->mds_lov_page_dirty == NULL) {
66 CERROR("NULL bitmap!\n");
70 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71 CDEBUG(D_INFO, "%u - %lx\n", i,
72 mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty =
107 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108 if (mds->mds_lov_page_dirty == NULL)
112 OBD_ALLOC(mds->mds_lov_page_array, size);
113 if (mds->mds_lov_page_array == NULL)
114 GOTO(err_free_bitmap, rc = -ENOMEM);
116 /* open and test the lov objd file */
117 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
120 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121 GOTO(err_free, rc = PTR_ERR(file));
123 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125 file->f_dentry->d_inode->i_mode);
126 GOTO(err_open, rc = -ENOENT);
128 mds->mds_lov_objid_filp = file;
132 if (filp_close((struct file *)file, 0))
133 CERROR("can't close %s after error\n", LOV_OBJID);
135 OBD_FREE(mds->mds_lov_page_array, size);
137 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
170 * currently exist two ways for know about ost count and max ost index.
171 * first - after ost is connected to mds and sync process finished
172 * second - get from lmm in recovery process, in case when mds not have configs,
173 * and ost isn't registered in mgs.
175 * \param mds pointer to mds structure
176 * \param index maxium ost index
178 * \retval -ENOMEM is not hame memory for new page
179 * \retval 0 is update passed
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 __u32 page = index / OBJID_PER_PAGE();
184 __u32 off = index % OBJID_PER_PAGE();
185 obd_id *data = mds->mds_lov_page_array[page];
188 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
192 mds->mds_lov_page_array[page] = data;
195 if (index > mds->mds_lov_objid_max_index) {
196 mds->mds_lov_objid_lastpage = page;
197 mds->mds_lov_objid_lastidx = off;
198 mds->mds_lov_objid_max_index = index;
201 /* workaround - New target not in objids file; increase mdsize */
202 /* ld_tgt_count is used as the max index everywhere, despite its name. */
203 if (data[off] == 0) {
207 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
209 mds->mds_lov_objid_count++;
210 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
211 mds->mds_lov_objid_count);
213 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
214 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
216 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
217 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
218 mds->mds_max_cookiesize);
225 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
227 __u32 page = index / OBJID_PER_PAGE();
228 __u32 off = index % OBJID_PER_PAGE();
229 obd_id *data = mds->mds_lov_page_array[page];
231 return (data[off] > 0);
234 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
236 struct lov_ost_data_v1 *data;
241 /* if we create file without objects - lmm is NULL */
245 switch (le32_to_cpu(lmm->lmm_magic)) {
247 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
248 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
251 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
252 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
255 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
260 cfs_mutex_lock(&obd->obd_dev_mutex);
261 for (j = 0; j < count; j++) {
262 __u32 i = le32_to_cpu(data[j].l_ost_idx);
263 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
268 cfs_mutex_unlock(&obd->obd_dev_mutex);
272 EXPORT_SYMBOL(mds_lov_prepare_objids);
275 * write llog orphan record about lost ost object,
276 * Special lsm is allocated with single stripe, caller should deallocated it
279 static int mds_log_lost_precreated(struct obd_device *obd,
280 struct lov_stripe_md **lsmp, __u16 *stripes,
281 obd_id id, obd_count count, int idx)
283 struct lov_stripe_md *lsm = *lsmp;
288 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
291 /* need only one stripe, save old value */
292 *stripes = lsm->lsm_stripe_count;
293 lsm->lsm_stripe_count = 1;
297 lsm->lsm_oinfo[0]->loi_id = id;
298 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
299 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
301 rc = mds_log_op_orphan(obd, lsm, count);
305 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
307 struct mds_obd *mds = &obd->u.mds;
309 struct lov_ost_data_v1 *obj;
310 struct lov_stripe_md *lsm = NULL;
315 /* if we create file without objects - lmm is NULL */
319 switch (le32_to_cpu(lmm->lmm_magic)) {
321 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
322 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
325 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
326 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
329 CERROR("Unknow lmm type %X !\n",
330 le32_to_cpu(lmm->lmm_magic));
334 for (j = 0; j < count; j++) {
335 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
336 obd_id id = le64_to_cpu(obj[j].l_object_id);
337 __u32 page = i / OBJID_PER_PAGE();
338 __u32 idx = i % OBJID_PER_PAGE();
341 data = mds->mds_lov_page_array[page];
343 CDEBUG(D_INODE,"update last object for ost %u"
344 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
345 if (id > data[idx]) {
346 int lost = id - data[idx] - 1;
347 /* we might have lost precreated objects due to VBR */
348 if (lost > 0 && obd->obd_recovering) {
349 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
350 if (!obd->obd_version_recov)
351 CERROR("Unexpected gap in objids\n");
352 /* lsm is allocated if NULL */
353 mds_log_lost_precreated(obd, &lsm, &stripes,
354 data[idx]+1, lost, i);
357 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
361 /* restore stripes number */
362 lsm->lsm_stripe_count = stripes;
363 obd_free_memmd(mds->mds_lov_exp, &lsm);
368 EXPORT_SYMBOL(mds_lov_update_objids);
370 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
373 __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
376 for (i = 0; i < count; i++) {
380 mds->mds_lov_objid_count++;
383 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
384 mds->mds_lov_objid_count);
386 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
387 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
389 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
390 "%d/%d\n", stripes, mds->mds_max_mdsize,
391 mds->mds_max_cookiesize);
397 static int mds_lov_read_objids(struct obd_device *obd)
399 struct mds_obd *mds = &obd->u.mds;
401 int i, rc = 0, count = 0, page = 0;
405 /* Read everything in the file, even if our current lov desc
406 has fewer targets. Old targets not in the lov descriptor
407 during mds setup may still have valid objids. */
408 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
412 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
413 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
414 for (i = 0; i < page; i++) {
416 loff_t off_old = off;
418 LASSERT(mds->mds_lov_page_array[i] == NULL);
419 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
420 if (mds->mds_lov_page_array[i] == NULL)
421 GOTO(out, rc = -ENOMEM);
423 data = mds->mds_lov_page_array[i];
425 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
426 MDS_LOV_ALLOC_SIZE, &off);
428 CERROR("Error reading objids %d\n", rc);
431 if (off == off_old) /* hole is read */
432 off += MDS_LOV_ALLOC_SIZE;
434 count = (off - off_old) / sizeof(obd_id);
435 if (mds_lov_update_from_read(mds, data, count)) {
436 CERROR("Can't update mds data\n");
437 GOTO(out, rc = -EIO);
440 mds->mds_lov_objid_lastpage = page - 1;
441 mds->mds_lov_objid_lastidx = count - 1;
443 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
444 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
446 mds_lov_dump_objids("read",obd);
451 int mds_lov_write_objids(struct obd_device *obd)
453 struct mds_obd *mds = &obd->u.mds;
457 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
460 mds_lov_dump_objids("write", obd);
462 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
463 obd_id *data = mds->mds_lov_page_array[i];
464 unsigned int size = MDS_LOV_ALLOC_SIZE;
465 loff_t off = i * size;
467 LASSERT(data != NULL);
469 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
472 /* check for particaly filled last page */
473 if (i == mds->mds_lov_objid_lastpage)
474 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
476 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
477 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
480 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
489 EXPORT_SYMBOL(mds_lov_write_objids);
491 static int mds_lov_get_objid(struct obd_device * obd,
494 struct mds_obd *mds = &obd->u.mds;
495 struct obd_export *lov_exp = mds->mds_lov_exp;
503 page = idx / OBJID_PER_PAGE();
504 off = idx % OBJID_PER_PAGE();
505 data = mds->mds_lov_page_array[page];
508 /* We never read this lastid; ask the osc */
509 struct obd_id_info lastid;
511 size = sizeof(lastid);
513 lastid.data = &data[off];
514 rc = obd_get_info(NULL, lov_exp, sizeof(KEY_LAST_ID),
515 KEY_LAST_ID, &size, &lastid, NULL);
519 /* workaround for clean filter */
523 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
525 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
526 idx, data, page, off, data[off]);
531 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
534 struct obdo oa = { 0 };
535 struct obd_trans_info oti = {0};
536 struct lov_stripe_md *empty_ea = NULL;
539 LASSERT(mds->mds_lov_page_array != NULL);
541 /* This create will in fact either create or destroy: If the OST is
542 * missing objects below this ID, they will be created. If it finds
543 * objects above this ID, they will be removed. */
544 memset(&oa, 0, sizeof(oa));
545 oa.o_flags = OBD_FL_DELORPHAN;
546 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
547 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
548 if (ost_uuid != NULL)
549 oti.oti_ost_uuid = ost_uuid;
551 rc = obd_create(NULL, mds->mds_lov_exp, &oa, &empty_ea, &oti);
557 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
559 struct mds_obd *mds = &obd->u.mds;
561 struct obd_id_info info;
564 LASSERT(!obd->obd_recovering);
568 rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
569 KEY_NEXT_ID, sizeof(info), &info, NULL);
571 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
577 /* Update the lov desc for a new size lov. */
578 static int mds_lov_update_desc(struct obd_device *obd, int idx,
579 struct obd_uuid *uuid)
581 struct mds_obd *mds = &obd->u.mds;
583 __u32 valsize = sizeof(mds->mds_lov_desc);
587 OBD_ALLOC(ld, sizeof(*ld));
591 rc = obd_get_info(NULL, mds->mds_lov_exp, sizeof(KEY_LOVDESC),
592 KEY_LOVDESC, &valsize, ld, NULL);
596 /* Don't change the mds_lov_desc until the objids size matches the
598 mds->mds_lov_desc = *ld;
599 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
600 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
602 cfs_mutex_lock(&obd->obd_dev_mutex);
603 rc = mds_lov_update_max_ost(mds, idx);
604 cfs_mutex_unlock(&obd->obd_dev_mutex);
608 /* If we added a target we have to reconnect the llogs */
609 /* We only _need_ to do this at first add (idx), or the first time
610 after recovery. However, it should now be safe to call anytime. */
611 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
616 OBD_FREE(ld, sizeof(*ld));
620 /* Inform MDS about new/updated target */
621 static int mds_lov_update_mds(struct obd_device *obd,
622 struct obd_device *watched,
625 struct mds_obd *mds = &obd->u.mds;
632 LASSERT(mds_lov_objinit(mds, idx));
634 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
635 idx, obd->obd_recovering, obd->obd_async_recov,
636 mds->mds_lov_desc.ld_tgt_count);
638 /* idx is set as data from lov_notify. */
639 if (obd->obd_recovering)
642 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
643 CERROR("index %d > count %d!\n", idx,
644 mds->mds_lov_desc.ld_tgt_count);
645 GOTO(out, rc = -EINVAL);
648 rc = mds_lov_get_objid(obd, idx);
652 page = idx / OBJID_PER_PAGE();
653 off = idx % OBJID_PER_PAGE();
654 data = mds->mds_lov_page_array[page];
656 /* We have read this lastid from disk; tell the osc.
657 Don't call this during recovery. */
658 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
660 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
661 /* Don't abort the rest of the sync */
664 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
671 /* update the LOV-OSC knowledge of the last used object id's */
672 int mds_lov_connect(struct obd_device *obd, char * lov_name)
674 struct mds_obd *mds = &obd->u.mds;
675 struct obd_connect_data *data;
679 if (IS_ERR(mds->mds_lov_obd))
680 RETURN(PTR_ERR(mds->mds_lov_obd));
682 if (mds->mds_lov_obd)
685 mds->mds_lov_obd = class_name2obd(lov_name);
686 if (!mds->mds_lov_obd) {
687 CERROR("MDS cannot locate LOV %s\n", lov_name);
688 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
692 cfs_mutex_lock(&obd->obd_dev_mutex);
693 rc = mds_lov_read_objids(obd);
694 cfs_mutex_unlock(&obd->obd_dev_mutex);
696 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
700 rc = obd_register_observer(mds->mds_lov_obd, obd);
702 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
707 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
709 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
711 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
713 OBD_ALLOC(data, sizeof(*data));
715 GOTO(err_exit, rc = -ENOMEM);
717 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
718 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
719 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FULL20 |
720 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
721 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
722 OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
723 #ifdef HAVE_LRU_RESIZE_SUPPORT
724 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
726 data->ocd_version = LUSTRE_VERSION_CODE;
727 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
728 data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
730 /* send max bytes per rpc */
731 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
732 /* send the list of supported checksum types */
733 data->ocd_cksum_types = cksum_types_supported_client();
734 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
735 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
736 OBD_FREE(data, sizeof(*data));
738 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
739 mds->mds_lov_obd = ERR_PTR(rc);
743 /* I want to see a callback happen when the OBD moves to a
744 * "For General Use" state, and that's when we'll call
745 * set_nextid(). The class driver can help us here, because
746 * it can use the obd_recovering flag to determine when the
747 * the OBD is full available. */
748 /* MDD device will care about that
749 if (!obd->obd_recovering)
750 rc = mds_postrecov(obd);
755 mds->mds_lov_exp = NULL;
756 mds->mds_lov_obd = ERR_PTR(rc);
760 int mds_lov_disconnect(struct obd_device *obd)
762 struct mds_obd *mds = &obd->u.mds;
766 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
767 obd_register_observer(mds->mds_lov_obd, NULL);
769 /* The actual disconnect of the mds_lov will be called from
770 * class_disconnect_exports from mds_lov_clean. So we have to
771 * ensure that class_cleanup doesn't fail due to the extra ref
772 * we're holding now. The mechanism to do that already exists -
773 * the obd_force flag. We'll drop the final ref to the
774 * mds_lov_exp in mds_cleanup. */
775 mds->mds_lov_obd->obd_force = 1;
781 struct mds_lov_sync_info {
782 struct obd_device *mlsi_obd; /* the lov device to sync */
783 struct obd_device *mlsi_watched; /* target osc */
784 __u32 mlsi_index; /* index of target */
787 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
789 struct mds_capa_info info = { .uuid = uuid };
790 struct lustre_capa_key *key;
795 if (!mds->mds_capa_keys)
798 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
799 for (i = 0; i < 2; i++) {
800 key = &mds->mds_capa_keys[i];
801 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
804 rc = obd_set_info_async(NULL, mds->mds_lov_exp,
805 sizeof(KEY_CAPA_KEY), KEY_CAPA_KEY,
806 sizeof(info), &info, NULL);
808 DEBUG_CAPA_KEY(D_ERROR, key,
809 "propagate failed (rc = %d) for", rc);
817 /* We only sync one osc at a time, so that we don't have to hold
818 any kind of lock on the whole mds_lov_desc, which may change
819 (grow) as a result of mds_lov_add_ost. This also avoids any
820 kind of mismatch between the lov_desc and the mds_lov_desc,
821 which are not in lock-step during lov_add_obd */
822 static int __mds_lov_synchronize(void *data)
824 struct mds_lov_sync_info *mlsi = data;
825 struct obd_device *obd = mlsi->mlsi_obd;
826 struct obd_device *watched = mlsi->mlsi_watched;
827 struct mds_obd *mds = &obd->u.mds;
828 struct obd_uuid *uuid;
829 __u32 idx = mlsi->mlsi_index;
830 struct mds_group_info mgi;
831 struct llog_ctxt *ctxt;
839 uuid = &watched->u.cli.cl_target_uuid;
842 cfs_down_read(&mds->mds_notify_lock);
843 if (obd->obd_stopping || obd->obd_fail)
844 GOTO(out, rc = -ENODEV);
846 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
847 rc = mds_lov_update_mds(obd, watched, idx);
849 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
852 mgi.group = mdt_to_obd_objseq(mds->mds_id);
855 rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
856 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
859 /* propagate capability keys */
860 rc = mds_propagate_capa_keys(mds, uuid);
864 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
866 GOTO(out, rc = -ENODEV);
868 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
869 rc = llog_connect(ctxt, NULL, NULL, uuid);
872 CERROR("%s failed at llog_origin_connect: %d\n",
873 obd_uuid2str(uuid), rc);
877 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
878 obd->obd_name, obd_uuid2str(uuid));
880 rc = mds_lov_clear_orphans(mds, uuid);
882 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
883 obd_uuid2str(uuid), rc);
887 #ifdef HAVE_QUOTA_SUPPORT
888 if (obd->obd_upcall.onu_owner) {
890 * This is a hack for mds_notify->mdd_notify. When the mds obd
891 * in mdd is removed, This hack should be removed.
893 LASSERT(obd->obd_upcall.onu_upcall != NULL);
894 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
895 obd->obd_upcall.onu_owner,NULL);
901 /* Deactivate it for safety */
902 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
904 if (!obd->obd_stopping && mds->mds_lov_obd &&
905 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
906 obd_notify(mds->mds_lov_obd, watched,
907 OBD_NOTIFY_INACTIVE, NULL);
909 cfs_up_read(&mds->mds_notify_lock);
911 class_decref(obd, "mds_lov_synchronize", obd);
915 int mds_lov_synchronize(void *data)
917 struct mds_lov_sync_info *mlsi = data;
920 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
921 cfs_daemonize_ctxt(name);
923 RETURN(__mds_lov_synchronize(data));
926 int mds_lov_start_synchronize(struct obd_device *obd,
927 struct obd_device *watched,
928 void *data, enum obd_notify_event ev)
930 struct mds_lov_sync_info *mlsi;
932 struct obd_uuid *uuid;
936 uuid = &watched->u.cli.cl_target_uuid;
938 OBD_ALLOC(mlsi, sizeof(*mlsi));
943 mlsi->mlsi_obd = obd;
944 mlsi->mlsi_watched = watched;
945 mlsi->mlsi_index = *(__u32 *)data;
947 /* Although class_export_get(obd->obd_self_export) would lock
948 the MDS in place, since it's only a self-export
949 it doesn't lock the LOV in place. The LOV can be disconnected
950 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
951 Simply taking an export ref on the LOV doesn't help, because it's
952 still disconnected. Taking an obd reference insures that we don't
953 disconnect the LOV. This of course means a cleanup won't
954 finish for as long as the sync is blocking. */
955 class_incref(obd, "mds_lov_synchronize", obd);
957 if (ev != OBD_NOTIFY_SYNC) {
958 /* Synchronize in the background */
959 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
962 CERROR("%s: error starting mds_lov_synchronize: %d\n",
964 class_decref(obd, "mds_lov_synchronize", obd);
966 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
967 "thread=%d\n", obd->obd_name,
968 mlsi->mlsi_index, rc);
972 rc = __mds_lov_synchronize((void *)mlsi);
978 int mds_notify(struct obd_device *obd, struct obd_device *watched,
979 enum obd_notify_event ev, void *data)
981 struct mds_obd *mds = &obd->u.mds;
985 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
987 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
988 CERROR("unexpected notification of %s %s!\n",
989 watched->obd_type->typ_name, watched->obd_name);
993 /*XXX this notifies the MDD until lov handling use old mds code
996 if (obd->obd_upcall.onu_owner) {
997 LASSERT(obd->obd_upcall.onu_upcall != NULL);
998 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
999 obd->obd_upcall.onu_owner,
1000 &mds->mds_obt.obt_mount_count);
1004 /* We only handle these: */
1005 case OBD_NOTIFY_CREATE:
1006 CDEBUG(D_CONFIG, "%s: add target %s\n", obd->obd_name,
1007 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1008 /* We still have to fix the lov descriptor for ost's */
1010 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1011 &watched->u.cli.cl_target_uuid);
1013 case OBD_NOTIFY_ACTIVE:
1014 /* lov want one or more _active_ targets for work */
1015 /* activate event should be pass lov idx as argument */
1016 case OBD_NOTIFY_SYNC:
1017 case OBD_NOTIFY_SYNC_NONBLOCK:
1018 /* sync event should be pass lov idx as argument */
1024 if (obd->obd_recovering) {
1025 CDEBUG(D_CONFIG, "%s: Is in recovery, "
1026 "not resetting orphans on %s\n",
1028 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1029 /* We still have to fix the lov descriptor for ost's added
1030 after the mdt in the config log. They didn't make it into
1032 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1033 &watched->u.cli.cl_target_uuid);
1035 rc = mds_lov_start_synchronize(obd, watched, data, ev);