4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mgc/mgc_request_server.c
33 * Author: Nathan Rutman <nathan@clusterfs.com>
36 #define DEBUG_SUBSYSTEM S_MGC
37 #define D_MGC D_CONFIG /*|D_WARNING*/
39 #include <linux/module.h>
40 #include <linux/kthread.h>
41 #include <linux/random.h>
43 #include <dt_object.h>
44 #include <lprocfs_status.h>
45 #include <lustre_dlm.h>
46 #include <lustre_disk.h>
47 #include <lustre_log.h>
48 #include <lustre_nodemap.h>
49 #include <lustre_swab.h>
50 #include <obd_class.h>
51 #include <lustre_barrier.h>
53 #include "mgc_internal.h"
55 static int mgc_local_llog_init(const struct lu_env *env,
56 struct obd_device *obd,
57 struct obd_device *disk)
59 struct llog_ctxt *ctxt;
63 rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, disk,
68 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
70 ctxt->loc_dir = obd->u.cli.cl_mgc_configs_dir;
76 static int mgc_local_llog_fini(const struct lu_env *env,
77 struct obd_device *obd)
79 struct llog_ctxt *ctxt;
82 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
83 llog_cleanup(env, ctxt);
88 static int mgc_fs_setup(const struct lu_env *env, struct obd_device *obd,
89 struct super_block *sb)
91 struct lustre_sb_info *lsi = s2lsi(sb);
92 struct client_obd *cli = &obd->u.cli;
93 struct lu_fid rfid, fid;
94 struct dt_object *root, *dto;
99 LASSERT(lsi->lsi_dt_dev);
101 /* The mgc fs exclusion mutex. Only one fs can be setup at a time. */
102 mutex_lock(&cli->cl_mgc_mutex);
104 /* Setup the configs dir */
105 fid.f_seq = FID_SEQ_LOCAL_NAME;
108 rc = local_oid_storage_init(env, lsi->lsi_dt_dev, &fid,
113 rc = dt_root_get(env, lsi->lsi_dt_dev, &rfid);
117 root = dt_locate_at(env, lsi->lsi_dt_dev, &rfid,
118 &cli->cl_mgc_los->los_dev->dd_lu_dev, NULL);
119 if (unlikely(IS_ERR(root)))
120 GOTO(out_los, rc = PTR_ERR(root));
122 dto = local_file_find_or_create(env, cli->cl_mgc_los, root,
125 dt_object_put_nocache(env, root);
127 GOTO(out_los, rc = PTR_ERR(dto));
129 cli->cl_mgc_configs_dir = dto;
131 LASSERT(lsi->lsi_osd_exp->exp_obd->obd_lvfs_ctxt.dt);
132 rc = mgc_local_llog_init(env, obd, lsi->lsi_osd_exp->exp_obd);
136 /* We take an obd ref to insure that we can't get to mgc_cleanup
137 * without calling mgc_fs_cleanup first.
139 class_incref(obd, "mgc_fs", obd);
141 /* We keep the cl_mgc_sem until mgc_fs_cleanup */
145 dt_object_put(env, cli->cl_mgc_configs_dir);
146 cli->cl_mgc_configs_dir = NULL;
150 local_oid_storage_fini(env, cli->cl_mgc_los);
152 cli->cl_mgc_los = NULL;
153 mutex_unlock(&cli->cl_mgc_mutex);
158 static int mgc_fs_cleanup(const struct lu_env *env, struct obd_device *obd)
160 struct client_obd *cli = &obd->u.cli;
163 LASSERT(cli->cl_mgc_los);
165 mgc_local_llog_fini(env, obd);
167 dt_object_put_nocache(env, cli->cl_mgc_configs_dir);
168 cli->cl_mgc_configs_dir = NULL;
170 local_oid_storage_fini(env, cli->cl_mgc_los);
171 cli->cl_mgc_los = NULL;
173 class_decref(obd, "mgc_fs", obd);
174 mutex_unlock(&cli->cl_mgc_mutex);
179 /* Send target_reg message to MGS */
180 static int mgc_target_register(struct obd_export *exp,
181 struct mgs_target_info *mti)
183 size_t mti_len = offsetof(struct mgs_target_info, mti_nidlist);
184 struct ptlrpc_request *req;
185 struct mgs_target_info *req_mti, *rep_mti;
189 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MGS_TARGET_REG);
193 server_mti_print("mgc_target_register: req", mti);
194 if (target_supports_large_nid(mti)) {
195 mti_len += mti->mti_nid_count * LNET_NIDSTR_SIZE;
197 req_capsule_set_size(&req->rq_pill, &RMF_MGS_TARGET_INFO,
198 RCL_CLIENT, mti_len);
200 req_capsule_set_size(&req->rq_pill, &RMF_MGS_TARGET_INFO,
201 RCL_SERVER, mti_len);
204 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_TARGET_REG);
206 ptlrpc_request_free(req);
210 req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
212 ptlrpc_req_finished(req);
216 memcpy(req_mti, mti, mti_len);
217 ptlrpc_request_set_replen(req);
218 CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
219 /* Limit how long we will wait for the enqueue to complete */
220 req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
222 /* if the target needs to regenerate the config log in MGS, it's better
223 * to use some longer limit to let MGC have time to change connection to
224 * another MGS (or try again with the same MGS) for the target (server)
225 * will fail and exit if the request expired due to delay limit.
227 if (mti->mti_flags & (LDD_F_UPDATE | LDD_F_NEED_INDEX))
228 req->rq_delay_limit = MGC_TARGET_REG_LIMIT_MAX;
230 rc = ptlrpc_queue_wait(req);
231 if (ptlrpc_client_replied(req)) {
232 rep_mti = req_capsule_server_get(&req->rq_pill,
233 &RMF_MGS_TARGET_INFO);
235 mti_len = offsetof(struct mgs_target_info, mti_nidlist);
237 if (target_supports_large_nid(mti))
238 mti_len += mti->mti_nid_count * LNET_NIDSTR_SIZE;
239 memcpy(mti, rep_mti, mti_len);
243 CDEBUG(D_MGC, "register %s got index = %d\n",
244 mti->mti_svname, mti->mti_stripe_index);
245 server_mti_print("mgc_target_register: rep", mti);
247 ptlrpc_req_finished(req);
252 int mgc_set_info_async_server(const struct lu_env *env,
253 struct obd_export *exp,
254 u32 keylen, void *key,
255 u32 vallen, void *val,
256 struct ptlrpc_request_set *set)
261 /* FIXME move this to mgc_process_config */
262 if (KEY_IS(KEY_REGISTER_TARGET)) {
263 size_t mti_len = offsetof(struct mgs_target_info, mti_nidlist);
264 struct mgs_target_info *mti = val;
266 if (target_supports_large_nid(mti))
267 mti_len += mti->mti_nid_count * LNET_NIDSTR_SIZE;
269 if (vallen != mti_len)
272 CDEBUG(D_MGC, "register_target %s %#x\n",
273 mti->mti_svname, mti->mti_flags);
274 rc = mgc_target_register(exp, mti);
277 if (KEY_IS(KEY_SET_FS)) {
278 struct super_block *sb = (struct super_block *)val;
280 if (vallen != sizeof(struct super_block))
283 rc = mgc_fs_setup(env, exp->exp_obd, sb);
286 if (KEY_IS(KEY_CLEAR_FS)) {
289 rc = mgc_fs_cleanup(env, exp->exp_obd);
296 int mgc_process_nodemap_log(struct obd_device *obd,
297 struct config_llog_data *cld)
299 struct ptlrpc_connection *mgc_conn;
300 struct ptlrpc_request *req = NULL;
301 struct mgs_config_body *body;
302 struct mgs_config_res *res;
303 struct nodemap_config *new_config = NULL;
304 struct lu_nodemap *recent_nodemap = NULL;
305 struct ptlrpc_bulk_desc *desc;
306 struct page **pages = NULL;
307 u64 config_read_offset = 0;
308 u8 nodemap_cur_pass = 0;
311 bool mne_swab = false;
317 mgc_conn = class_exp2cliimp(cld->cld_mgcexp)->imp_connection;
319 /* don't need to get local config */
320 if (LNetIsPeerLocal(&mgc_conn->c_peer.nid))
323 /* allocate buffer for bulk transfer.
324 * if this is the first time for this mgs to read logs,
325 * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
326 * once; otherwise, it only reads increment of logs, this should be
327 * small and CONFIG_READ_NRPAGES will be used.
329 nrpages = CONFIG_READ_NRPAGES_INIT;
331 OBD_ALLOC_PTR_ARRAY(pages, nrpages);
333 GOTO(out, rc = -ENOMEM);
335 for (i = 0; i < nrpages; i++) {
336 pages[i] = alloc_page(GFP_KERNEL);
338 GOTO(out, rc = -ENOMEM);
342 if (config_read_offset == 0) {
343 new_config = nodemap_config_alloc();
344 if (IS_ERR(new_config)) {
345 rc = PTR_ERR(new_config);
350 LASSERT(mutex_is_locked(&cld->cld_lock));
351 req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
352 &RQF_MGS_CONFIG_READ);
354 GOTO(out, rc = -ENOMEM);
356 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
361 body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
363 LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
364 if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
365 >= sizeof(body->mcb_name))
366 GOTO(out, rc = -E2BIG);
367 body->mcb_offset = config_read_offset;
368 body->mcb_type = cld->cld_type;
369 body->mcb_bits = PAGE_SHIFT;
370 body->mcb_units = nrpages;
371 body->mcb_nm_cur_pass = nodemap_cur_pass;
373 /* allocate bulk transfer descriptor */
374 desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
375 PTLRPC_BULK_PUT_SINK,
377 &ptlrpc_bulk_kiov_pin_ops);
379 GOTO(out, rc = -ENOMEM);
381 for (i = 0; i < nrpages; i++)
382 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
385 ptlrpc_request_set_replen(req);
386 rc = ptlrpc_queue_wait(req);
390 res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
392 GOTO(out, rc = -EPROTO);
394 config_read_offset = res->mcr_offset;
395 eof = config_read_offset == II_END_OFF;
396 nodemap_cur_pass = res->mcr_nm_cur_pass;
398 ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
400 GOTO(out, rc = ealen);
402 if (ealen > nrpages << PAGE_SHIFT)
403 GOTO(out, rc = -EINVAL);
405 if (ealen == 0) { /* no logs transferred */
406 /* config changed since first read RPC */
407 if (config_read_offset == 0) {
408 CDEBUG(D_INFO, "nodemap config changed in transit, retrying\n");
409 GOTO(out, rc = -EAGAIN);
416 mne_swab = req_capsule_rep_need_swab(&req->rq_pill);
418 /* When a nodemap config is received, we build a new nodemap config,
419 * with new nodemap structs. We keep track of the most recently added
420 * nodemap since the config is read ordered by nodemap_id, and so it
421 * is likely that the next record will be related. Because access to
422 * the nodemaps is single threaded until the nodemap_config is active,
423 * we don't need to reference count with recent_nodemap, though
424 * recent_nodemap should be set to NULL when the nodemap_config
425 * is either destroyed or set active.
427 for (i = 0; i < nrpages && ealen > 0; i++) {
431 ptr = kmap(pages[i]);
432 rc2 = nodemap_process_idx_pages(new_config, ptr,
436 CWARN("%s: error processing %s log nodemap: rc = %d\n",
448 ptlrpc_req_finished(req);
456 /* recent_nodemap cannot be used after set_active/dealloc */
458 nodemap_config_set_active_mgc(new_config);
460 nodemap_config_dealloc(new_config);
464 for (i = 0; i < nrpages; i++) {
467 __free_page(pages[i]);
469 OBD_FREE_PTR_ARRAY(pages, nrpages);
474 int mgc_process_config_server(struct obd_device *obd, size_t len, void *buf)
476 struct lustre_cfg *lcfg = buf;
480 switch (lcfg->lcfg_command) {
481 case LCFG_LOV_ADD_OBD: {
482 /* Overloading this cfg command: register a new target */
483 struct mgs_target_info *mti;
485 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
486 sizeof(struct mgs_target_info))
487 GOTO(out, rc = -EINVAL);
489 mti = lustre_cfg_buf(lcfg, 1);
490 CDEBUG(D_MGC, "add_target %s %#x\n",
491 mti->mti_svname, mti->mti_flags);
492 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
495 case LCFG_LOV_DEL_OBD:
496 /* Unregister has no meaning at the moment. */
497 CERROR("lov_del_obd unimplemented\n");
505 int mgc_barrier_glimpse_ast(struct ldlm_lock *lock, void *data)
507 struct config_llog_data *cld = lock->l_ast_data;
511 if (cld->cld_stopping)
514 rc = barrier_handler(s2lsi(cld->cld_cfg.cfg_sb)->lsi_dt_dev,
515 (struct ptlrpc_request *)data);
520 /* Copy a remote log locally */
521 static int mgc_llog_local_copy(const struct lu_env *env,
522 struct obd_device *obd,
523 struct llog_ctxt *rctxt,
524 struct llog_ctxt *lctxt, char *logname)
531 * NB: mgc_process_server_cfg_log() always needs valid local copy
532 * and works only on it, so that defines the process:
533 * - copy current local copy to temp_log using llog_backup()
534 * - copy remote llog to logname using llog_backup()
535 * - if failed then restore logname from backup
538 OBD_ALLOC(temp_log, strlen(logname) + 2);
541 sprintf(temp_log, "%sT", logname);
543 /* copy current local llog to temp_log */
544 rc = llog_backup(env, obd, lctxt, lctxt, logname, temp_log);
545 if (rc < 0 && rc != -ENOENT)
546 CWARN("%s: failed to backup local config %s: rc = %d\n",
547 obd->obd_name, logname, rc);
549 /* build new local llog */
550 rc = llog_backup(env, obd, rctxt, lctxt, logname, logname);
552 CWARN("%s: no remote llog for %s, check MGS config\n",
553 obd->obd_name, logname);
554 llog_erase(env, lctxt, NULL, logname);
556 /* error during backup, get local one back from the copy */
557 CWARN("%s: failed to copy new config %s: rc = %d\n",
558 obd->obd_name, logname, rc);
559 llog_backup(env, obd, lctxt, lctxt, temp_log, logname);
561 llog_erase(env, lctxt, NULL, temp_log);
563 OBD_FREE(temp_log, strlen(logname) + 2);
567 int mgc_process_server_cfg_log(struct lu_env *env, struct llog_ctxt **ctxt,
568 struct lustre_sb_info *lsi,
569 struct obd_device *mgc,
570 struct config_llog_data *cld, int local_only)
572 struct llog_ctxt *lctxt = llog_get_context(mgc, LLOG_CONFIG_ORIG_CTXT);
573 struct client_obd *cli = &mgc->u.cli;
576 /* Copy the setup log locally if we can. Don't mess around if we're
577 * running an MGS though (logs are already local).
579 if (lctxt && lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
580 cli->cl_mgc_configs_dir &&
581 lu2dt_dev(cli->cl_mgc_configs_dir->do_lu.lo_dev) ==
583 if (!local_only && !lsi->lsi_dt_dev->dd_rdonly) {
584 /* Only try to copy log if we have the lock. */
585 CDEBUG(D_INFO, "%s: copy local log %s\n",
586 mgc->obd_name, cld->cld_logname);
588 rc = mgc_llog_local_copy(env, mgc, *ctxt, lctxt,
591 lsi->lsi_flags &= ~LDD_F_NO_LOCAL_LOGS;
593 if (local_only || rc) {
594 if (unlikely(lsi->lsi_flags & LDD_F_NO_LOCAL_LOGS) ||
596 CWARN("%s: local log %s are not valid and/or remote logs are not accessbile rc = %d\n",
597 mgc->obd_name, cld->cld_logname, rc);
598 GOTO(out_pop, rc = -EIO);
601 if (strcmp(cld->cld_logname, PARAMS_FILENAME) != 0 &&
602 llog_is_empty(env, lctxt, cld->cld_logname)) {
603 LCONSOLE_ERROR_MSG(0x13a,
604 "Failed to get MGS log %s and no local copy.\n",
606 GOTO(out_pop, rc = -ENOENT);
609 "%s: Failed to get MGS log %s, using local copy for now, will try to update later.\n",
610 mgc->obd_name, cld->cld_logname);
613 /* Now, whether we copied or not, start using the local llog.
614 * If we failed to copy, we'll start using whatever the old
617 llog_ctxt_put(*ctxt);
620 } else if (local_only) { /* no local log at client side */
621 GOTO(out_pop, rc = -EIO);
625 if (lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
626 lsi->lsi_dt_dev->dd_rdonly) {
627 struct llog_ctxt *rctxt;
629 /* Under readonly mode, we may have no local copy or local
630 * copy is incomplete, so try to use remote llog firstly.
632 rctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
635 rc = class_config_parse_llog(env, rctxt, cld->cld_logname,
637 llog_ctxt_put(rctxt);
641 __llog_ctxt_put(env, lctxt);