4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mgc/mgc_request_server.c
33 * Author: Nathan Rutman <nathan@clusterfs.com>
36 #define DEBUG_SUBSYSTEM S_MGC
37 #define D_MGC D_CONFIG /*|D_WARNING*/
39 #include <linux/module.h>
40 #include <linux/kthread.h>
41 #include <linux/random.h>
43 #include <dt_object.h>
44 #include <lprocfs_status.h>
45 #include <lustre_dlm.h>
46 #include <lustre_disk.h>
47 #include <lustre_log.h>
48 #include <lustre_nodemap.h>
49 #include <lustre_swab.h>
50 #include <obd_class.h>
51 #include <lustre_barrier.h>
53 #include "mgc_internal.h"
55 static int mgc_local_llog_init(const struct lu_env *env,
56 struct obd_device *obd,
57 struct obd_device *disk)
59 struct llog_ctxt *ctxt;
63 rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, disk,
68 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
70 ctxt->loc_dir = obd->u.cli.cl_mgc_configs_dir;
76 static int mgc_local_llog_fini(const struct lu_env *env,
77 struct obd_device *obd)
79 struct llog_ctxt *ctxt;
82 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
83 llog_cleanup(env, ctxt);
88 static int mgc_fs_setup(const struct lu_env *env, struct obd_device *obd,
89 struct super_block *sb)
91 struct lustre_sb_info *lsi = s2lsi(sb);
92 struct client_obd *cli = &obd->u.cli;
93 struct lu_fid rfid, fid;
94 struct dt_object *root, *dto;
99 LASSERT(lsi->lsi_dt_dev);
101 /* The mgc fs exclusion mutex. Only one fs can be setup at a time. */
102 mutex_lock(&cli->cl_mgc_mutex);
104 /* Setup the configs dir */
105 fid.f_seq = FID_SEQ_LOCAL_NAME;
108 rc = local_oid_storage_init(env, lsi->lsi_dt_dev, &fid,
113 rc = dt_root_get(env, lsi->lsi_dt_dev, &rfid);
117 root = dt_locate_at(env, lsi->lsi_dt_dev, &rfid,
118 &cli->cl_mgc_los->los_dev->dd_lu_dev, NULL);
119 if (unlikely(IS_ERR(root)))
120 GOTO(out_los, rc = PTR_ERR(root));
122 dto = local_file_find_or_create(env, cli->cl_mgc_los, root,
125 dt_object_put_nocache(env, root);
127 GOTO(out_los, rc = PTR_ERR(dto));
129 cli->cl_mgc_configs_dir = dto;
131 LASSERT(lsi->lsi_osd_exp->exp_obd->obd_lvfs_ctxt.dt);
132 rc = mgc_local_llog_init(env, obd, lsi->lsi_osd_exp->exp_obd);
136 /* We take an obd ref to insure that we can't get to mgc_cleanup
137 * without calling mgc_fs_cleanup first.
139 class_incref(obd, "mgc_fs", obd);
141 /* We keep the cl_mgc_sem until mgc_fs_cleanup */
145 dt_object_put(env, cli->cl_mgc_configs_dir);
146 cli->cl_mgc_configs_dir = NULL;
150 local_oid_storage_fini(env, cli->cl_mgc_los);
152 cli->cl_mgc_los = NULL;
153 mutex_unlock(&cli->cl_mgc_mutex);
158 static int mgc_fs_cleanup(const struct lu_env *env, struct obd_device *obd)
160 struct client_obd *cli = &obd->u.cli;
163 LASSERT(cli->cl_mgc_los);
165 mgc_local_llog_fini(env, obd);
167 dt_object_put_nocache(env, cli->cl_mgc_configs_dir);
168 cli->cl_mgc_configs_dir = NULL;
170 local_oid_storage_fini(env, cli->cl_mgc_los);
171 cli->cl_mgc_los = NULL;
173 class_decref(obd, "mgc_fs", obd);
174 mutex_unlock(&cli->cl_mgc_mutex);
179 /* Send target_reg message to MGS */
180 static int mgc_target_register(struct obd_export *exp,
181 struct mgs_target_info *mti)
183 struct ptlrpc_request *req;
184 struct mgs_target_info *req_mti, *rep_mti;
188 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
189 &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
194 req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
196 ptlrpc_req_finished(req);
200 memcpy(req_mti, mti, sizeof(*req_mti));
201 ptlrpc_request_set_replen(req);
202 CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
203 /* Limit how long we will wait for the enqueue to complete */
204 req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
206 /* if the target needs to regenerate the config log in MGS, it's better
207 * to use some longer limit to let MGC have time to change connection to
208 * another MGS (or try again with the same MGS) for the target (server)
209 * will fail and exit if the request expired due to delay limit.
211 if (mti->mti_flags & (LDD_F_UPDATE | LDD_F_NEED_INDEX))
212 req->rq_delay_limit = MGC_TARGET_REG_LIMIT_MAX;
214 rc = ptlrpc_queue_wait(req);
215 if (ptlrpc_client_replied(req)) {
216 rep_mti = req_capsule_server_get(&req->rq_pill,
217 &RMF_MGS_TARGET_INFO);
219 memcpy(mti, rep_mti, sizeof(*rep_mti));
222 CDEBUG(D_MGC, "register %s got index = %d\n",
223 mti->mti_svname, mti->mti_stripe_index);
225 ptlrpc_req_finished(req);
230 int mgc_set_info_async_server(const struct lu_env *env,
231 struct obd_export *exp,
232 u32 keylen, void *key,
233 u32 vallen, void *val,
234 struct ptlrpc_request_set *set)
239 /* FIXME move this to mgc_process_config */
240 if (KEY_IS(KEY_REGISTER_TARGET)) {
241 struct mgs_target_info *mti;
243 if (vallen != sizeof(struct mgs_target_info))
245 mti = (struct mgs_target_info *)val;
246 CDEBUG(D_MGC, "register_target %s %#x\n",
247 mti->mti_svname, mti->mti_flags);
248 rc = mgc_target_register(exp, mti);
251 if (KEY_IS(KEY_SET_FS)) {
252 struct super_block *sb = (struct super_block *)val;
254 if (vallen != sizeof(struct super_block))
257 rc = mgc_fs_setup(env, exp->exp_obd, sb);
260 if (KEY_IS(KEY_CLEAR_FS)) {
263 rc = mgc_fs_cleanup(env, exp->exp_obd);
270 int mgc_process_nodemap_log(struct obd_device *obd,
271 struct config_llog_data *cld)
273 struct ptlrpc_connection *mgc_conn;
274 struct ptlrpc_request *req = NULL;
275 struct mgs_config_body *body;
276 struct mgs_config_res *res;
277 struct nodemap_config *new_config = NULL;
278 struct lu_nodemap *recent_nodemap = NULL;
279 struct ptlrpc_bulk_desc *desc;
280 struct page **pages = NULL;
281 u64 config_read_offset = 0;
282 u8 nodemap_cur_pass = 0;
285 bool mne_swab = false;
291 mgc_conn = class_exp2cliimp(cld->cld_mgcexp)->imp_connection;
293 /* don't need to get local config */
294 if (LNetIsPeerLocal(&mgc_conn->c_peer.nid))
297 /* allocate buffer for bulk transfer.
298 * if this is the first time for this mgs to read logs,
299 * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
300 * once; otherwise, it only reads increment of logs, this should be
301 * small and CONFIG_READ_NRPAGES will be used.
303 nrpages = CONFIG_READ_NRPAGES_INIT;
305 OBD_ALLOC_PTR_ARRAY(pages, nrpages);
307 GOTO(out, rc = -ENOMEM);
309 for (i = 0; i < nrpages; i++) {
310 pages[i] = alloc_page(GFP_KERNEL);
312 GOTO(out, rc = -ENOMEM);
316 if (config_read_offset == 0) {
317 new_config = nodemap_config_alloc();
318 if (IS_ERR(new_config)) {
319 rc = PTR_ERR(new_config);
324 LASSERT(mutex_is_locked(&cld->cld_lock));
325 req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
326 &RQF_MGS_CONFIG_READ);
328 GOTO(out, rc = -ENOMEM);
330 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
335 body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
337 LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
338 if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
339 >= sizeof(body->mcb_name))
340 GOTO(out, rc = -E2BIG);
341 body->mcb_offset = config_read_offset;
342 body->mcb_type = cld->cld_type;
343 body->mcb_bits = PAGE_SHIFT;
344 body->mcb_units = nrpages;
345 body->mcb_nm_cur_pass = nodemap_cur_pass;
347 /* allocate bulk transfer descriptor */
348 desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
349 PTLRPC_BULK_PUT_SINK,
351 &ptlrpc_bulk_kiov_pin_ops);
353 GOTO(out, rc = -ENOMEM);
355 for (i = 0; i < nrpages; i++)
356 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
359 ptlrpc_request_set_replen(req);
360 rc = ptlrpc_queue_wait(req);
364 res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
366 GOTO(out, rc = -EPROTO);
368 config_read_offset = res->mcr_offset;
369 eof = config_read_offset == II_END_OFF;
370 nodemap_cur_pass = res->mcr_nm_cur_pass;
372 ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
374 GOTO(out, rc = ealen);
376 if (ealen > nrpages << PAGE_SHIFT)
377 GOTO(out, rc = -EINVAL);
379 if (ealen == 0) { /* no logs transferred */
380 /* config changed since first read RPC */
381 if (config_read_offset == 0) {
382 CDEBUG(D_INFO, "nodemap config changed in transit, retrying\n");
383 GOTO(out, rc = -EAGAIN);
390 mne_swab = req_capsule_rep_need_swab(&req->rq_pill);
392 /* When a nodemap config is received, we build a new nodemap config,
393 * with new nodemap structs. We keep track of the most recently added
394 * nodemap since the config is read ordered by nodemap_id, and so it
395 * is likely that the next record will be related. Because access to
396 * the nodemaps is single threaded until the nodemap_config is active,
397 * we don't need to reference count with recent_nodemap, though
398 * recent_nodemap should be set to NULL when the nodemap_config
399 * is either destroyed or set active.
401 for (i = 0; i < nrpages && ealen > 0; i++) {
405 ptr = kmap(pages[i]);
406 rc2 = nodemap_process_idx_pages(new_config, ptr,
410 CWARN("%s: error processing %s log nodemap: rc = %d\n",
422 ptlrpc_req_finished(req);
430 /* recent_nodemap cannot be used after set_active/dealloc */
432 nodemap_config_set_active_mgc(new_config);
434 nodemap_config_dealloc(new_config);
438 for (i = 0; i < nrpages; i++) {
441 __free_page(pages[i]);
443 OBD_FREE_PTR_ARRAY(pages, nrpages);
448 int mgc_process_config_server(struct obd_device *obd, size_t len, void *buf)
450 struct lustre_cfg *lcfg = buf;
454 switch (lcfg->lcfg_command) {
455 case LCFG_LOV_ADD_OBD: {
456 /* Overloading this cfg command: register a new target */
457 struct mgs_target_info *mti;
459 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
460 sizeof(struct mgs_target_info))
461 GOTO(out, rc = -EINVAL);
463 mti = lustre_cfg_buf(lcfg, 1);
464 CDEBUG(D_MGC, "add_target %s %#x\n",
465 mti->mti_svname, mti->mti_flags);
466 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
469 case LCFG_LOV_DEL_OBD:
470 /* Unregister has no meaning at the moment. */
471 CERROR("lov_del_obd unimplemented\n");
479 int mgc_barrier_glimpse_ast(struct ldlm_lock *lock, void *data)
481 struct config_llog_data *cld = lock->l_ast_data;
485 if (cld->cld_stopping)
488 rc = barrier_handler(s2lsi(cld->cld_cfg.cfg_sb)->lsi_dt_dev,
489 (struct ptlrpc_request *)data);
494 /* Copy a remote log locally */
495 static int mgc_llog_local_copy(const struct lu_env *env,
496 struct obd_device *obd,
497 struct llog_ctxt *rctxt,
498 struct llog_ctxt *lctxt, char *logname)
505 * - copy it to backup using llog_backup()
506 * - copy remote llog to logname using llog_backup()
507 * - if failed then move bakup to logname again
509 OBD_ALLOC(temp_log, strlen(logname) + 2);
512 sprintf(temp_log, "%sT", logname);
514 /* make a copy of local llog at first */
515 rc = llog_backup(env, obd, lctxt, lctxt, logname, temp_log);
516 if (rc < 0 && rc != -ENOENT)
518 /* copy remote llog to the local copy */
519 rc = llog_backup(env, obd, rctxt, lctxt, logname, logname);
521 /* no remote llog, delete local one too */
522 llog_erase(env, lctxt, NULL, logname);
524 /* error during backup, get local one back from the copy */
525 llog_backup(env, obd, lctxt, lctxt, temp_log, logname);
527 CERROR("%s: failed to copy remote log %s: rc = %d\n",
528 obd->obd_name, logname, rc);
530 llog_erase(env, lctxt, NULL, temp_log);
531 OBD_FREE(temp_log, strlen(logname) + 2);
535 int mgc_process_server_cfg_log(struct lu_env *env, struct llog_ctxt **ctxt,
536 struct lustre_sb_info *lsi,
537 struct obd_device *mgc,
538 struct config_llog_data *cld, int local_only)
540 struct llog_ctxt *lctxt = llog_get_context(mgc, LLOG_CONFIG_ORIG_CTXT);
541 struct client_obd *cli = &mgc->u.cli;
544 /* Copy the setup log locally if we can. Don't mess around if we're
545 * running an MGS though (logs are already local).
547 if (lctxt && lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
548 cli->cl_mgc_configs_dir &&
549 lu2dt_dev(cli->cl_mgc_configs_dir->do_lu.lo_dev) ==
551 if (!local_only && !lsi->lsi_dt_dev->dd_rdonly) {
552 /* Only try to copy log if we have the lock. */
553 CDEBUG(D_INFO, "%s: copy local log %s\n",
554 mgc->obd_name, cld->cld_logname);
556 rc = mgc_llog_local_copy(env, mgc, *ctxt, lctxt,
559 lsi->lsi_flags &= ~LDD_F_NO_LOCAL_LOGS;
561 if (local_only || rc) {
562 if (unlikely(lsi->lsi_flags & LDD_F_NO_LOCAL_LOGS) ||
564 CWARN("%s: local log %s are not valid and/or remote logs are not accessbile rc = %d\n",
565 mgc->obd_name, cld->cld_logname, rc);
566 GOTO(out_pop, rc = -EIO);
569 if (strcmp(cld->cld_logname, PARAMS_FILENAME) != 0 &&
570 llog_is_empty(env, lctxt, cld->cld_logname)) {
571 LCONSOLE_ERROR_MSG(0x13a,
572 "Failed to get MGS log %s and no local copy.\n",
574 GOTO(out_pop, rc = -ENOENT);
577 "%s: Failed to get MGS log %s, using local copy for now, will try to update later.\n",
578 mgc->obd_name, cld->cld_logname);
581 /* Now, whether we copied or not, start using the local llog.
582 * If we failed to copy, we'll start using whatever the old
585 llog_ctxt_put(*ctxt);
588 } else if (local_only) { /* no local log at client side */
589 GOTO(out_pop, rc = -EIO);
593 if (lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
594 lsi->lsi_dt_dev->dd_rdonly) {
595 struct llog_ctxt *rctxt;
597 /* Under readonly mode, we may have no local copy or local
598 * copy is incomplete, so try to use remote llog firstly.
600 rctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
603 rc = class_config_parse_llog(env, rctxt, cld->cld_logname,
605 llog_ctxt_put(rctxt);
609 __llog_ctxt_put(env, lctxt);