1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52 if (ids[i] > (mds->mds_lov_objids)[i]) {
53 (mds->mds_lov_objids)[i] = ids[i];
54 mds->mds_lov_objids_dirty = 1;
59 EXPORT_SYMBOL(mds_lov_update_objids);
61 static int mds_lov_read_objids(struct obd_device *obd)
63 struct mds_obd *mds = &obd->u.mds;
69 LASSERT(!mds->mds_lov_objids_size);
70 LASSERT(!mds->mds_lov_objids_dirty);
72 /* Read everything in the file, even if our current lov desc
73 has fewer targets. Old targets not in the lov descriptor
74 during mds setup may still have valid objids. */
75 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
82 mds->mds_lov_objids = ids;
83 mds->mds_lov_objids_size = size;
85 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
87 CERROR("Error reading objids %d\n", rc);
91 mds->mds_lov_objids_in_file = size / sizeof(*ids);
93 for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95 mds->mds_lov_objids[i], i);
100 int mds_lov_write_objids(struct obd_device *obd)
102 struct mds_obd *mds = &obd->u.mds;
107 if (!mds->mds_lov_objids_dirty)
110 tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
115 for (i = 0; i < tgts; i++)
116 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117 mds->mds_lov_objids[i], i);
119 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120 mds->mds_lov_objids, tgts * sizeof(obd_id),
123 mds->mds_lov_objids_dirty = 0;
129 EXPORT_SYMBOL(mds_lov_write_objids);
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
135 struct obd_trans_info oti = {0};
136 struct lov_stripe_md *empty_ea = NULL;
139 LASSERT(mds->mds_lov_objids != NULL);
141 /* This create will in fact either create or destroy: If the OST is
142 * missing objects below this ID, they will be created. If it finds
143 * objects above this ID, they will be removed. */
144 memset(&oa, 0, sizeof(oa));
145 oa.o_flags = OBD_FL_DELORPHAN;
146 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148 if (ost_uuid != NULL) {
149 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150 oa.o_valid |= OBD_MD_FLINLINE;
152 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
160 struct mds_obd *mds = &obd->u.mds;
164 LASSERT(!obd->obd_recovering);
166 LASSERT(mds->mds_lov_objids != NULL);
168 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
170 mds->mds_lov_desc.ld_tgt_count *
171 sizeof(*mds->mds_lov_objids),
172 mds->mds_lov_objids, NULL);
175 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
184 struct mds_obd *mds = &obd->u.mds;
186 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
190 OBD_ALLOC(ld, sizeof(*ld));
194 rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
199 /* The size of the LOV target table may have increased. */
200 size = ld->ld_tgt_count * sizeof(obd_id);
201 if ((mds->mds_lov_objids_size == 0) ||
202 (size > mds->mds_lov_objids_size)) {
205 /* add room by powers of 2 */
207 while (size < ld->ld_tgt_count)
209 size = size * sizeof(obd_id);
211 OBD_ALLOC(ids, size);
213 GOTO(out, rc = -ENOMEM);
214 memset(ids, 0, size);
215 if (mds->mds_lov_objids_size) {
216 obd_id *old_ids = mds->mds_lov_objids;
217 memcpy(ids, mds->mds_lov_objids,
218 mds->mds_lov_objids_size);
219 mds->mds_lov_objids = ids;
220 OBD_FREE(old_ids, mds->mds_lov_objids_size);
222 mds->mds_lov_objids = ids;
223 mds->mds_lov_objids_size = size;
226 /* Don't change the mds_lov_desc until the objids size matches the
228 mds->mds_lov_desc = *ld;
229 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230 mds->mds_lov_desc.ld_tgt_count);
232 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233 max(mds->mds_lov_desc.ld_tgt_count,
234 mds->mds_lov_objids_in_file));
235 mds->mds_max_mdsize = lov_mds_md_size(stripes);
236 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
241 /* If we added a target we have to reconnect the llogs */
242 /* We only _need_ to do this at first add (idx), or the first time
243 after recovery. However, it should now be safe to call anytime. */
244 mutex_down(&obd->obd_dev_sem);
245 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246 mutex_up(&obd->obd_dev_sem);
248 /*XXX this notifies the MDD until lov handling use old mds code */
249 if (obd->obd_upcall.onu_owner) {
250 LASSERT(obd->obd_upcall.onu_upcall != NULL);
251 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252 obd->obd_upcall.onu_owner);
255 OBD_FREE(ld, sizeof(*ld));
260 #define MDSLOV_NO_INDEX -1
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264 struct obd_device *watched,
265 __u32 idx, struct obd_uuid *uuid)
267 struct mds_obd *mds = &obd->u.mds;
272 old_count = mds->mds_lov_desc.ld_tgt_count;
273 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
277 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279 mds->mds_lov_desc.ld_tgt_count);
281 /* idx is set as data from lov_notify. */
282 if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284 CERROR("index %d > count %d!\n", idx,
285 mds->mds_lov_desc.ld_tgt_count);
289 if (idx >= mds->mds_lov_objids_in_file) {
290 /* We never read this lastid; ask the osc */
292 __u32 size = sizeof(lastid);
293 rc = obd_get_info(watched->obd_self_export,
295 "last_id", &size, &lastid);
298 mds->mds_lov_objids[idx] = lastid;
299 mds->mds_lov_objids_dirty = 1;
300 mds_lov_write_objids(obd);
302 /* We have read this lastid from disk; tell the osc.
303 Don't call this during recovery. */
304 rc = mds_lov_set_nextid(obd);
307 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308 mds->mds_lov_objids[idx], idx);
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
317 struct mds_obd *mds = &obd->u.mds;
318 struct lustre_handle conn = {0,};
319 struct obd_connect_data *data;
323 if (IS_ERR(mds->mds_osc_obd))
324 RETURN(PTR_ERR(mds->mds_osc_obd));
326 if (mds->mds_osc_obd)
329 mds->mds_osc_obd = class_name2obd(lov_name);
330 if (!mds->mds_osc_obd) {
331 CERROR("MDS cannot locate LOV %s\n", lov_name);
332 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
336 OBD_ALLOC(data, sizeof(*data));
339 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341 OBD_CONNECT_OSS_CAPA;
342 #ifdef HAVE_LRU_RESIZE_SUPPORT
343 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
345 data->ocd_version = LUSTRE_VERSION_CODE;
346 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
347 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
348 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
349 OBD_FREE(data, sizeof(*data));
351 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
352 mds->mds_osc_obd = ERR_PTR(rc);
355 mds->mds_osc_exp = class_conn2export(&conn);
357 rc = obd_register_observer(mds->mds_osc_obd, obd);
359 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
361 GOTO(err_discon, rc);
364 /* Deny new client connections until we are sure we have some OSTs */
365 obd->obd_no_conn = 1;
367 rc = mds_lov_read_objids(obd);
369 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
373 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
377 /* tgt_count may be 0! */
378 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
380 CERROR("failed to initialize catalog %d\n", rc);
384 /* If we're mounting this code for the first time on an existing FS,
385 * we need to populate the objids array from the real OST values */
386 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
387 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
388 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
389 "last_id", &size, mds->mds_lov_objids);
391 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
392 CWARN("got last object "LPU64" from OST %d\n",
393 mds->mds_lov_objids[i], i);
394 mds->mds_lov_objids_dirty = 1;
395 rc = mds_lov_write_objids(obd);
397 CERROR("got last objids from OSTs, but error "
398 "writing objids file: %d\n", rc);
402 /* I want to see a callback happen when the OBD moves to a
403 * "For General Use" state, and that's when we'll call
404 * set_nextid(). The class driver can help us here, because
405 * it can use the obd_recovering flag to determine when the
406 * the OBD is full available. */
407 /* MDD device will care about that
408 if (!obd->obd_recovering)
409 rc = mds_postrecov(obd);
414 obd_register_observer(mds->mds_osc_obd, NULL);
416 obd_disconnect(mds->mds_osc_exp);
417 mds->mds_osc_exp = NULL;
418 mds->mds_osc_obd = ERR_PTR(rc);
422 int mds_lov_disconnect(struct obd_device *obd)
424 struct mds_obd *mds = &obd->u.mds;
428 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
429 obd_register_observer(mds->mds_osc_obd, NULL);
431 /* The actual disconnect of the mds_lov will be called from
432 * class_disconnect_exports from mds_lov_clean. So we have to
433 * ensure that class_cleanup doesn't fail due to the extra ref
434 * we're holding now. The mechanism to do that already exists -
435 * the obd_force flag. We'll drop the final ref to the
436 * mds_osc_exp in mds_cleanup. */
437 mds->mds_osc_obd->obd_force = 1;
443 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
444 void *karg, void *uarg)
446 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
447 struct obd_device *obd = exp->exp_obd;
448 struct mds_obd *mds = &obd->u.mds;
449 struct obd_ioctl_data *data = karg;
450 struct lvfs_run_ctxt saved;
454 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
457 case OBD_IOC_RECORD: {
458 char *name = data->ioc_inlbuf1;
459 if (mds->mds_cfg_llh)
462 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
463 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
464 &mds->mds_cfg_llh, NULL, name);
466 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
469 mds->mds_cfg_llh = NULL;
470 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
475 case OBD_IOC_ENDRECORD: {
476 if (!mds->mds_cfg_llh)
479 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
480 rc = llog_close(mds->mds_cfg_llh);
481 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
483 mds->mds_cfg_llh = NULL;
487 case OBD_IOC_CLEAR_LOG: {
488 char *name = data->ioc_inlbuf1;
489 if (mds->mds_cfg_llh)
492 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
493 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
494 &mds->mds_cfg_llh, NULL, name);
496 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
499 rc = llog_destroy(mds->mds_cfg_llh);
500 llog_free_handle(mds->mds_cfg_llh);
502 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
504 mds->mds_cfg_llh = NULL;
508 case OBD_IOC_DORECORD: {
510 struct llog_rec_hdr rec;
511 if (!mds->mds_cfg_llh)
514 rec.lrh_len = llog_data_len(data->ioc_plen1);
516 if (data->ioc_type == LUSTRE_CFG_TYPE) {
517 rec.lrh_type = OBD_CFG_REC;
519 CERROR("unknown cfg record type:%d \n", data->ioc_type);
523 OBD_ALLOC(cfg_buf, data->ioc_plen1);
526 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
528 OBD_FREE(cfg_buf, data->ioc_plen1);
532 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
533 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
535 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
537 OBD_FREE(cfg_buf, data->ioc_plen1);
541 case OBD_IOC_PARSE: {
542 struct llog_ctxt *ctxt =
543 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
544 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
546 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
553 case OBD_IOC_DUMP_LOG: {
554 struct llog_ctxt *ctxt =
555 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
558 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
566 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
567 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
571 case OBD_IOC_SET_READONLY: {
573 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
574 BDEVNAME_DECLARE_STORAGE(tmp);
575 CERROR("*** setting device %s read-only ***\n",
576 ll_bdevname(obd->u.obt.obt_sb, tmp));
578 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
580 rc = fsfilt_commit(obd, inode, handle, 1);
582 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
583 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
585 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
589 case OBD_IOC_CATLOGLIST: {
590 int count = mds->mds_lov_desc.ld_tgt_count;
591 rc = llog_catalog_list(obd, count, data);
595 case OBD_IOC_LLOG_CHECK:
596 case OBD_IOC_LLOG_CANCEL:
597 case OBD_IOC_LLOG_REMOVE: {
598 struct llog_ctxt *ctxt =
599 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
603 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
604 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
605 rc = llog_ioctl(ctxt, cmd, data);
606 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
607 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
608 group = FILTER_GROUP_MDS0 + mds->mds_id;
609 rc2 = obd_set_info_async(mds->mds_osc_exp,
610 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
611 sizeof(group), &group, NULL);
616 case OBD_IOC_LLOG_INFO:
617 case OBD_IOC_LLOG_PRINT: {
618 struct llog_ctxt *ctxt =
619 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
621 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
622 rc = llog_ioctl(ctxt, cmd, data);
623 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
628 case OBD_IOC_ABORT_RECOVERY:
629 CERROR("aborting recovery for device %s\n", obd->obd_name);
630 target_stop_recovery_thread(obd);
634 CDEBUG(D_INFO, "unknown command %x\n", cmd);
641 struct mds_lov_sync_info {
642 struct obd_device *mlsi_obd; /* the lov device to sync */
643 struct obd_device *mlsi_watched; /* target osc */
644 __u32 mlsi_index; /* index of target */
647 static int mds_propagate_capa_keys(struct mds_obd *mds)
649 struct lustre_capa_key *key;
654 if (!mds->mds_capa_keys)
657 for (i = 0; i < 2; i++) {
658 key = &mds->mds_capa_keys[i];
659 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
661 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
662 KEY_CAPA_KEY, sizeof(*key), key, NULL);
664 DEBUG_CAPA_KEY(D_ERROR, key,
665 "propagate failed (rc = %d) for", rc);
673 /* We only sync one osc at a time, so that we don't have to hold
674 any kind of lock on the whole mds_lov_desc, which may change
675 (grow) as a result of mds_lov_add_ost. This also avoids any
676 kind of mismatch between the lov_desc and the mds_lov_desc,
677 which are not in lock-step during lov_add_obd */
678 static int __mds_lov_synchronize(void *data)
680 struct mds_lov_sync_info *mlsi = data;
681 struct obd_device *obd = mlsi->mlsi_obd;
682 struct obd_device *watched = mlsi->mlsi_watched;
683 struct mds_obd *mds = &obd->u.mds;
684 struct obd_uuid *uuid;
685 __u32 idx = mlsi->mlsi_index;
686 struct mds_group_info mgi;
690 OBD_FREE(mlsi, sizeof(*mlsi));
694 uuid = &watched->u.cli.cl_target_uuid;
697 rc = mds_lov_update_mds(obd, watched, idx, uuid);
700 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
702 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
703 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
707 /* propagate capability keys */
708 rc = mds_propagate_capa_keys(mds);
712 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
713 mds->mds_lov_desc.ld_tgt_count,
717 CERROR("%s: failed at llog_origin_connect: %d\n",
722 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
723 obd->obd_name, obd_uuid2str(uuid));
725 * FIXME: this obd_stopping was useless,
726 * since obd in mdt layer was set
728 if (obd->obd_stopping)
729 GOTO(out, rc = -ENODEV);
731 rc = mds_lov_clear_orphans(mds, uuid);
733 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
738 if (obd->obd_upcall.onu_owner) {
740 * This is a hack for mds_notify->mdd_notify. When the mds obd
741 * in mdd is removed, This hack should be removed.
743 LASSERT(obd->obd_upcall.onu_upcall != NULL);
744 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
745 obd->obd_upcall.onu_owner);
753 int mds_lov_synchronize(void *data)
755 struct mds_lov_sync_info *mlsi = data;
758 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
759 /* There is still a watched target,
760 but we don't know its index */
761 sprintf(name, "ll_sync_tgt");
763 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
764 ptlrpc_daemonize(name);
766 RETURN(__mds_lov_synchronize(data));
769 int mds_lov_start_synchronize(struct obd_device *obd,
770 struct obd_device *watched,
771 void *data, int nonblock)
773 struct mds_lov_sync_info *mlsi;
780 OBD_ALLOC(mlsi, sizeof(*mlsi));
784 mlsi->mlsi_obd = obd;
785 mlsi->mlsi_watched = watched;
787 mlsi->mlsi_index = *(__u32 *)data;
789 mlsi->mlsi_index = MDSLOV_NO_INDEX;
791 /* Although class_export_get(obd->obd_self_export) would lock
792 the MDS in place, since it's only a self-export
793 it doesn't lock the LOV in place. The LOV can be disconnected
794 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
795 Simply taking an export ref on the LOV doesn't help, because it's
796 still disconnected. Taking an obd reference insures that we don't
797 disconnect the LOV. This of course means a cleanup won't
798 finish for as long as the sync is blocking. */
802 /* Synchronize in the background */
803 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
804 CLONE_VM | CLONE_FILES);
806 CERROR("%s: error starting mds_lov_synchronize: %d\n",
810 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
811 "thread=%d\n", obd->obd_name,
812 mlsi->mlsi_index, rc);
816 rc = __mds_lov_synchronize((void *)mlsi);
822 int mds_notify(struct obd_device *obd, struct obd_device *watched,
823 enum obd_notify_event ev, void *data)
829 /* We only handle these: */
830 case OBD_NOTIFY_ACTIVE:
831 case OBD_NOTIFY_SYNC:
832 case OBD_NOTIFY_SYNC_NONBLOCK:
834 case OBD_NOTIFY_CONFIG:
835 /* Open for clients */
836 obd->obd_no_conn = 0;
841 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
842 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
843 CERROR("unexpected notification of %s %s!\n",
844 watched->obd_type->typ_name, watched->obd_name);
848 if (obd->obd_recovering) {
849 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
851 obd_uuid2str(&watched->u.cli.cl_target_uuid));
852 /* We still have to fix the lov descriptor for ost's added
853 after the mdt in the config log. They didn't make it into
855 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
858 /* We should update init llog here too for replay unlink and
859 * possiable llog init race when recovery complete */
860 mutex_down(&obd->obd_dev_sem);
861 llog_cat_initialize(obd, NULL,
862 obd->u.mds.mds_lov_desc.ld_tgt_count,
863 &watched->u.cli.cl_target_uuid);
864 mutex_up(&obd->obd_dev_sem);
868 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
869 rc = mds_lov_start_synchronize(obd, watched, data,
870 !(ev == OBD_NOTIFY_SYNC));
872 lquota_recovery(mds_quota_interface_ref, obd);
877 /* Convert the on-disk LOV EA structre.
878 * We always try to convert from an old LOV EA format to the common in-memory
879 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
880 * then convert back to the new on-disk format and save it back to disk
881 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
882 * to convert it each time this inode is accessed.
884 * This function is a bit interesting in the error handling. We can safely
885 * ship the old lmm to the client in case of failure, since it uses the same
886 * obd_unpackmd() code and can do the conversion if the MDS fails for some
887 * reason. We will not delete the old lmm data until we have written the
888 * new format lmm data in fsfilt_set_md(). */
889 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
890 struct lov_mds_md *lmm, int lmm_size)
892 struct lov_stripe_md *lsm = NULL;
897 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
898 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
901 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
902 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
905 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
909 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
914 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
915 if (IS_ERR(handle)) {
916 rc = PTR_ERR(handle);
920 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
922 err = fsfilt_commit(obd, inode, handle, 0);
924 rc = err ? err : lmm_size;
927 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
932 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
933 struct lov_desc *desc)
936 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
937 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
938 le64_to_cpu(lmm->lmm_objects[i].l_object_id);
941 EXPORT_SYMBOL(mds_objids_from_lmm);