1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52 if (ids[i] > (mds->mds_lov_objids)[i]) {
53 (mds->mds_lov_objids)[i] = ids[i];
54 mds->mds_lov_objids_dirty = 1;
59 EXPORT_SYMBOL(mds_lov_update_objids);
61 static int mds_lov_read_objids(struct obd_device *obd)
63 struct mds_obd *mds = &obd->u.mds;
69 LASSERT(!mds->mds_lov_objids_size);
70 LASSERT(!mds->mds_lov_objids_dirty);
72 /* Read everything in the file, even if our current lov desc
73 has fewer targets. Old targets not in the lov descriptor
74 during mds setup may still have valid objids. */
75 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
82 mds->mds_lov_objids = ids;
83 mds->mds_lov_objids_size = size;
85 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
87 CERROR("Error reading objids %d\n", rc);
91 mds->mds_lov_objids_in_file = size / sizeof(*ids);
93 for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95 mds->mds_lov_objids[i], i);
100 int mds_lov_write_objids(struct obd_device *obd)
102 struct mds_obd *mds = &obd->u.mds;
107 if (!mds->mds_lov_objids_dirty)
110 tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
115 for (i = 0; i < tgts; i++)
116 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117 mds->mds_lov_objids[i], i);
119 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120 mds->mds_lov_objids, tgts * sizeof(obd_id),
123 mds->mds_lov_objids_dirty = 0;
129 EXPORT_SYMBOL(mds_lov_write_objids);
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
135 struct obd_trans_info oti = {0};
136 struct lov_stripe_md *empty_ea = NULL;
139 LASSERT(mds->mds_lov_objids != NULL);
141 /* This create will in fact either create or destroy: If the OST is
142 * missing objects below this ID, they will be created. If it finds
143 * objects above this ID, they will be removed. */
144 memset(&oa, 0, sizeof(oa));
145 oa.o_flags = OBD_FL_DELORPHAN;
146 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148 if (ost_uuid != NULL) {
149 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150 oa.o_valid |= OBD_MD_FLINLINE;
152 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
160 struct mds_obd *mds = &obd->u.mds;
164 LASSERT(!obd->obd_recovering);
166 LASSERT(mds->mds_lov_objids != NULL);
168 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
170 mds->mds_lov_desc.ld_tgt_count *
171 sizeof(*mds->mds_lov_objids),
172 mds->mds_lov_objids, NULL);
175 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
184 struct mds_obd *mds = &obd->u.mds;
186 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
190 OBD_ALLOC(ld, sizeof(*ld));
194 rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
199 /* The size of the LOV target table may have increased. */
200 size = ld->ld_tgt_count * sizeof(obd_id);
201 if ((mds->mds_lov_objids_size == 0) ||
202 (size > mds->mds_lov_objids_size)) {
205 /* add room by powers of 2 */
207 while (size < ld->ld_tgt_count)
209 size = size * sizeof(obd_id);
211 OBD_ALLOC(ids, size);
213 GOTO(out, rc = -ENOMEM);
214 memset(ids, 0, size);
215 if (mds->mds_lov_objids_size) {
216 obd_id *old_ids = mds->mds_lov_objids;
217 memcpy(ids, mds->mds_lov_objids,
218 mds->mds_lov_objids_size);
219 mds->mds_lov_objids = ids;
220 OBD_FREE(old_ids, mds->mds_lov_objids_size);
222 mds->mds_lov_objids = ids;
223 mds->mds_lov_objids_size = size;
226 /* Don't change the mds_lov_desc until the objids size matches the
228 mds->mds_lov_desc = *ld;
229 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230 mds->mds_lov_desc.ld_tgt_count);
232 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233 max(mds->mds_lov_desc.ld_tgt_count,
234 mds->mds_lov_objids_in_file));
235 mds->mds_max_mdsize = lov_mds_md_size(stripes);
236 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
241 /* If we added a target we have to reconnect the llogs */
242 /* We only _need_ to do this at first add (idx), or the first time
243 after recovery. However, it should now be safe to call anytime. */
244 mutex_down(&obd->obd_dev_sem);
245 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246 mutex_up(&obd->obd_dev_sem);
248 /*XXX this notifies the MDD until lov handling use old mds code */
249 if (obd->obd_upcall.onu_owner) {
250 LASSERT(obd->obd_upcall.onu_upcall != NULL);
251 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252 obd->obd_upcall.onu_owner);
255 OBD_FREE(ld, sizeof(*ld));
260 #define MDSLOV_NO_INDEX -1
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264 struct obd_device *watched,
265 __u32 idx, struct obd_uuid *uuid)
267 struct mds_obd *mds = &obd->u.mds;
272 old_count = mds->mds_lov_desc.ld_tgt_count;
273 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
277 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279 mds->mds_lov_desc.ld_tgt_count);
281 /* idx is set as data from lov_notify. */
282 if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284 CERROR("index %d > count %d!\n", idx,
285 mds->mds_lov_desc.ld_tgt_count);
289 if (idx >= mds->mds_lov_objids_in_file) {
290 /* We never read this lastid; ask the osc */
292 __u32 size = sizeof(lastid);
293 rc = obd_get_info(watched->obd_self_export,
295 "last_id", &size, &lastid);
298 mds->mds_lov_objids[idx] = lastid;
299 mds->mds_lov_objids_dirty = 1;
300 mds_lov_write_objids(obd);
302 /* We have read this lastid from disk; tell the osc.
303 Don't call this during recovery. */
304 rc = mds_lov_set_nextid(obd);
307 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308 mds->mds_lov_objids[idx], idx);
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
317 struct mds_obd *mds = &obd->u.mds;
318 struct lustre_handle conn = {0,};
319 struct obd_connect_data *data;
323 if (IS_ERR(mds->mds_osc_obd))
324 RETURN(PTR_ERR(mds->mds_osc_obd));
326 if (mds->mds_osc_obd)
329 mds->mds_osc_obd = class_name2obd(lov_name);
330 if (!mds->mds_osc_obd) {
331 CERROR("MDS cannot locate LOV %s\n", lov_name);
332 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
336 OBD_ALLOC(data, sizeof(*data));
339 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341 OBD_CONNECT_OSS_CAPA;
342 #ifdef HAVE_LRU_RESIZE_SUPPORT
343 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
345 data->ocd_version = LUSTRE_VERSION_CODE;
346 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
347 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
348 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
349 OBD_FREE(data, sizeof(*data));
351 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
352 mds->mds_osc_obd = ERR_PTR(rc);
355 mds->mds_osc_exp = class_conn2export(&conn);
357 rc = obd_register_observer(mds->mds_osc_obd, obd);
359 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
361 GOTO(err_discon, rc);
364 /* Deny new client connections until we are sure we have some OSTs */
365 obd->obd_no_conn = 1;
367 rc = mds_lov_read_objids(obd);
369 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
373 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
377 /* tgt_count may be 0! */
378 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
380 CERROR("failed to initialize catalog %d\n", rc);
384 /* If we're mounting this code for the first time on an existing FS,
385 * we need to populate the objids array from the real OST values */
386 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
387 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
388 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
389 "last_id", &size, mds->mds_lov_objids);
391 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
392 CWARN("got last object "LPU64" from OST %d\n",
393 mds->mds_lov_objids[i], i);
394 mds->mds_lov_objids_dirty = 1;
395 rc = mds_lov_write_objids(obd);
397 CERROR("got last objids from OSTs, but error "
398 "writing objids file: %d\n", rc);
402 /* I want to see a callback happen when the OBD moves to a
403 * "For General Use" state, and that's when we'll call
404 * set_nextid(). The class driver can help us here, because
405 * it can use the obd_recovering flag to determine when the
406 * the OBD is full available. */
407 /* MDD device will care about that
408 if (!obd->obd_recovering)
409 rc = mds_postrecov(obd);
414 obd_register_observer(mds->mds_osc_obd, NULL);
416 obd_disconnect(mds->mds_osc_exp);
417 mds->mds_osc_exp = NULL;
418 mds->mds_osc_obd = ERR_PTR(rc);
422 int mds_lov_disconnect(struct obd_device *obd)
424 struct mds_obd *mds = &obd->u.mds;
428 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
429 obd_register_observer(mds->mds_osc_obd, NULL);
431 /* The actual disconnect of the mds_lov will be called from
432 * class_disconnect_exports from mds_lov_clean. So we have to
433 * ensure that class_cleanup doesn't fail due to the extra ref
434 * we're holding now. The mechanism to do that already exists -
435 * the obd_force flag. We'll drop the final ref to the
436 * mds_osc_exp in mds_cleanup. */
437 mds->mds_osc_obd->obd_force = 1;
443 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
444 void *karg, void *uarg)
446 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
447 struct obd_device *obd = exp->exp_obd;
448 struct mds_obd *mds = &obd->u.mds;
449 struct obd_ioctl_data *data = karg;
450 struct lvfs_run_ctxt saved;
454 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
457 case OBD_IOC_RECORD: {
458 char *name = data->ioc_inlbuf1;
459 if (mds->mds_cfg_llh)
462 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
463 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
464 &mds->mds_cfg_llh, NULL, name);
466 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
469 mds->mds_cfg_llh = NULL;
470 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
475 case OBD_IOC_ENDRECORD: {
476 if (!mds->mds_cfg_llh)
479 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
480 rc = llog_close(mds->mds_cfg_llh);
481 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
483 mds->mds_cfg_llh = NULL;
487 case OBD_IOC_CLEAR_LOG: {
488 char *name = data->ioc_inlbuf1;
489 if (mds->mds_cfg_llh)
492 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
493 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
494 &mds->mds_cfg_llh, NULL, name);
496 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
499 rc = llog_destroy(mds->mds_cfg_llh);
500 llog_free_handle(mds->mds_cfg_llh);
502 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
504 mds->mds_cfg_llh = NULL;
508 case OBD_IOC_DORECORD: {
510 struct llog_rec_hdr rec;
511 if (!mds->mds_cfg_llh)
514 rec.lrh_len = llog_data_len(data->ioc_plen1);
516 if (data->ioc_type == LUSTRE_CFG_TYPE) {
517 rec.lrh_type = OBD_CFG_REC;
519 CERROR("unknown cfg record type:%d \n", data->ioc_type);
523 OBD_ALLOC(cfg_buf, data->ioc_plen1);
526 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
528 OBD_FREE(cfg_buf, data->ioc_plen1);
532 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
533 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
535 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
537 OBD_FREE(cfg_buf, data->ioc_plen1);
541 case OBD_IOC_PARSE: {
542 struct llog_ctxt *ctxt =
543 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
544 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
546 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
553 case OBD_IOC_DUMP_LOG: {
554 struct llog_ctxt *ctxt =
555 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
558 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
566 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
567 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
571 case OBD_IOC_SET_READONLY: {
573 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
574 BDEVNAME_DECLARE_STORAGE(tmp);
575 CERROR("*** setting device %s read-only ***\n",
576 ll_bdevname(obd->u.obt.obt_sb, tmp));
578 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
580 rc = fsfilt_commit(obd, inode, handle, 1);
582 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
583 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
585 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
589 case OBD_IOC_CATLOGLIST: {
590 int count = mds->mds_lov_desc.ld_tgt_count;
591 rc = llog_catalog_list(obd, count, data);
595 case OBD_IOC_LLOG_CHECK:
596 case OBD_IOC_LLOG_CANCEL:
597 case OBD_IOC_LLOG_REMOVE: {
598 struct llog_ctxt *ctxt =
599 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
603 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
604 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
605 rc = llog_ioctl(ctxt, cmd, data);
606 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
607 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
608 group = FILTER_GROUP_MDS0 + mds->mds_id;
609 rc2 = obd_set_info_async(mds->mds_osc_exp,
610 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
611 sizeof(group), &group, NULL);
616 case OBD_IOC_LLOG_INFO:
617 case OBD_IOC_LLOG_PRINT: {
618 struct llog_ctxt *ctxt =
619 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
621 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
622 rc = llog_ioctl(ctxt, cmd, data);
623 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
628 case OBD_IOC_ABORT_RECOVERY:
629 CERROR("aborting recovery for device %s\n", obd->obd_name);
630 target_stop_recovery_thread(obd);
634 CDEBUG(D_INFO, "unknown command %x\n", cmd);
641 /* Collect the preconditions we need to allow client connects */
642 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
644 if (flag & CONFIG_LOG)
645 obd->u.mds.mds_fl_cfglog = 1;
646 if (flag & CONFIG_SYNC)
647 obd->u.mds.mds_fl_synced = 1;
648 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
649 /* Open for clients */
650 obd->obd_no_conn = 0;
653 struct mds_lov_sync_info {
654 struct obd_device *mlsi_obd; /* the lov device to sync */
655 struct obd_device *mlsi_watched; /* target osc */
656 __u32 mlsi_index; /* index of target */
659 static int mds_propagate_capa_keys(struct mds_obd *mds)
661 struct lustre_capa_key *key;
666 if (!mds->mds_capa_keys)
669 for (i = 0; i < 2; i++) {
670 key = &mds->mds_capa_keys[i];
671 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
673 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
674 KEY_CAPA_KEY, sizeof(*key), key, NULL);
676 DEBUG_CAPA_KEY(D_ERROR, key,
677 "propagate failed (rc = %d) for", rc);
685 /* We only sync one osc at a time, so that we don't have to hold
686 any kind of lock on the whole mds_lov_desc, which may change
687 (grow) as a result of mds_lov_add_ost. This also avoids any
688 kind of mismatch between the lov_desc and the mds_lov_desc,
689 which are not in lock-step during lov_add_obd */
690 static int __mds_lov_synchronize(void *data)
692 struct mds_lov_sync_info *mlsi = data;
693 struct obd_device *obd = mlsi->mlsi_obd;
694 struct obd_device *watched = mlsi->mlsi_watched;
695 struct mds_obd *mds = &obd->u.mds;
696 struct obd_uuid *uuid;
697 __u32 idx = mlsi->mlsi_index;
698 struct mds_group_info mgi;
702 OBD_FREE(mlsi, sizeof(*mlsi));
706 uuid = &watched->u.cli.cl_target_uuid;
709 rc = mds_lov_update_mds(obd, watched, idx, uuid);
711 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
714 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
717 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
718 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
722 /* propagate capability keys */
723 rc = mds_propagate_capa_keys(mds);
727 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
728 mds->mds_lov_desc.ld_tgt_count,
732 CERROR("%s: failed at llog_origin_connect: %d\n",
737 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
738 obd->obd_name, obd_uuid2str(uuid));
740 * FIXME: this obd_stopping was useless,
741 * since obd in mdt layer was set
743 if (obd->obd_stopping)
744 GOTO(out, rc = -ENODEV);
746 rc = mds_lov_clear_orphans(mds, uuid);
748 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
749 obd_uuid2str(uuid), rc);
753 if (obd->obd_upcall.onu_owner) {
755 * This is a hack for mds_notify->mdd_notify. When the mds obd
756 * in mdd is removed, This hack should be removed.
758 LASSERT(obd->obd_upcall.onu_upcall != NULL);
759 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
760 obd->obd_upcall.onu_owner);
768 int mds_lov_synchronize(void *data)
770 struct mds_lov_sync_info *mlsi = data;
773 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
774 /* There is still a watched target,
775 but we don't know its index */
776 sprintf(name, "ll_sync_tgt");
778 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
779 ptlrpc_daemonize(name);
781 RETURN(__mds_lov_synchronize(data));
784 int mds_lov_start_synchronize(struct obd_device *obd,
785 struct obd_device *watched,
786 void *data, int nonblock)
788 struct mds_lov_sync_info *mlsi;
795 OBD_ALLOC(mlsi, sizeof(*mlsi));
799 mlsi->mlsi_obd = obd;
800 mlsi->mlsi_watched = watched;
802 mlsi->mlsi_index = *(__u32 *)data;
804 mlsi->mlsi_index = MDSLOV_NO_INDEX;
806 /* Although class_export_get(obd->obd_self_export) would lock
807 the MDS in place, since it's only a self-export
808 it doesn't lock the LOV in place. The LOV can be disconnected
809 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
810 Simply taking an export ref on the LOV doesn't help, because it's
811 still disconnected. Taking an obd reference insures that we don't
812 disconnect the LOV. This of course means a cleanup won't
813 finish for as long as the sync is blocking. */
817 /* Synchronize in the background */
818 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
819 CLONE_VM | CLONE_FILES);
821 CERROR("%s: error starting mds_lov_synchronize: %d\n",
825 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
826 "thread=%d\n", obd->obd_name,
827 mlsi->mlsi_index, rc);
831 rc = __mds_lov_synchronize((void *)mlsi);
837 int mds_notify(struct obd_device *obd, struct obd_device *watched,
838 enum obd_notify_event ev, void *data)
844 /* We only handle these: */
845 case OBD_NOTIFY_ACTIVE:
846 case OBD_NOTIFY_SYNC:
847 case OBD_NOTIFY_SYNC_NONBLOCK:
849 case OBD_NOTIFY_CONFIG:
850 mds_allow_cli(obd, (unsigned int)data);
855 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
856 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
857 CERROR("unexpected notification of %s %s!\n",
858 watched->obd_type->typ_name, watched->obd_name);
862 if (obd->obd_recovering) {
863 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
865 obd_uuid2str(&watched->u.cli.cl_target_uuid));
866 /* We still have to fix the lov descriptor for ost's added
867 after the mdt in the config log. They didn't make it into
869 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
872 /* We should update init llog here too for replay unlink and
873 * possiable llog init race when recovery complete */
874 mutex_down(&obd->obd_dev_sem);
875 llog_cat_initialize(obd, NULL,
876 obd->u.mds.mds_lov_desc.ld_tgt_count,
877 &watched->u.cli.cl_target_uuid);
878 mutex_up(&obd->obd_dev_sem);
879 mds_allow_cli(obd, CONFIG_SYNC);
883 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
884 rc = mds_lov_start_synchronize(obd, watched, data,
885 !(ev == OBD_NOTIFY_SYNC));
887 lquota_recovery(mds_quota_interface_ref, obd);
892 /* Convert the on-disk LOV EA structre.
893 * We always try to convert from an old LOV EA format to the common in-memory
894 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
895 * then convert back to the new on-disk format and save it back to disk
896 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
897 * to convert it each time this inode is accessed.
899 * This function is a bit interesting in the error handling. We can safely
900 * ship the old lmm to the client in case of failure, since it uses the same
901 * obd_unpackmd() code and can do the conversion if the MDS fails for some
902 * reason. We will not delete the old lmm data until we have written the
903 * new format lmm data in fsfilt_set_md(). */
904 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
905 struct lov_mds_md *lmm, int lmm_size)
907 struct lov_stripe_md *lsm = NULL;
912 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
913 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
916 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
917 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
920 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
924 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
929 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
930 if (IS_ERR(handle)) {
931 rc = PTR_ERR(handle);
935 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
937 err = fsfilt_commit(obd, inode, handle, 0);
939 rc = err ? err : lmm_size;
942 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
947 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
948 struct lov_desc *desc)
951 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
952 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
953 le64_to_cpu(lmm->lmm_objects[i].l_object_id);
956 EXPORT_SYMBOL(mds_objids_from_lmm);