Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
84
85         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 exp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, exp, localdata);
94
95         if (rc) {
96                 class_disconnect(exp);
97         } else {
98                 class_export_put(exp);
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data)
107 {
108         ENTRY;
109
110         if (exp == NULL || obd == NULL || cluuid == NULL)
111                 RETURN(-EINVAL);
112
113         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
114
115         if (data != NULL) {
116                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
117                 exp->exp_connect_flags = data->ocd_connect_flags;
118                 data->ocd_version = LUSTRE_VERSION_CODE;
119         }
120
121         RETURN(0);
122 }
123
124 static int mgs_disconnect(struct obd_export *exp)
125 {
126         int rc;
127         ENTRY;
128
129         LASSERT(exp);
130
131         class_export_get(exp);
132         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
133
134         /* Disconnect early so that clients can't keep using export */
135         rc = class_disconnect(exp);
136         ldlm_cancel_locks_for_export(exp);
137
138         lprocfs_exp_cleanup(exp);
139
140         /* complete all outstanding replies */
141         spin_lock(&exp->exp_lock);
142         while (!list_empty(&exp->exp_outstanding_replies)) {
143                 struct ptlrpc_reply_state *rs =
144                         list_entry(exp->exp_outstanding_replies.next,
145                                    struct ptlrpc_reply_state, rs_exp_list);
146                 struct ptlrpc_service *svc = rs->rs_service;
147
148                 spin_lock(&svc->srv_lock);
149                 list_del_init(&rs->rs_exp_list);
150                 ptlrpc_schedule_difficult_reply(rs);
151                 spin_unlock(&svc->srv_lock);
152         }
153         spin_unlock(&exp->exp_lock);
154
155         class_export_put(exp);
156         RETURN(rc);
157 }
158
159 static int mgs_cleanup(struct obd_device *obd);
160 static int mgs_handle(struct ptlrpc_request *req);
161
162 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
163                          struct obd_device *tgt, int count,
164                          struct llog_catid *logid, struct obd_uuid *uuid)
165 {
166         int rc;
167         ENTRY;
168
169         LASSERT(olg == &obd->obd_olg);
170         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
171                         &llog_lvfs_ops);
172         RETURN(rc);
173 }
174
175 static int mgs_llog_finish(struct obd_device *obd, int count)
176 {
177         struct llog_ctxt *ctxt;
178         int rc = 0;
179         ENTRY;
180
181         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
182         if (ctxt)
183                 rc = llog_cleanup(ctxt);
184
185         RETURN(rc);
186 }
187
188 /* Start the MGS obd */
189 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
190 {
191         struct lprocfs_static_vars lvars;
192         struct mgs_obd *mgs = &obd->u.mgs;
193         struct lustre_mount_info *lmi;
194         struct lustre_sb_info *lsi;
195         struct vfsmount *mnt;
196         int rc = 0;
197         ENTRY;
198
199         CDEBUG(D_CONFIG, "Starting MGS\n");
200
201         /* Find our disk */
202         lmi = server_get_mount(obd->obd_name);
203         if (!lmi)
204                 RETURN(rc = -EINVAL);
205
206         mnt = lmi->lmi_mnt;
207         lsi = s2lsi(lmi->lmi_sb);
208         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
209         if (IS_ERR(obd->obd_fsops))
210                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
211
212         /* namespace for mgs llog */
213         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
214                                                 LDLM_NAMESPACE_MODEST);
215         if (obd->obd_namespace == NULL)
216                 GOTO(err_ops, rc = -ENOMEM);
217
218         /* ldlm setup */
219         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
220                            "mgs_ldlm_client", &obd->obd_ldlm_client);
221
222         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
223
224         rc = mgs_fs_setup(obd, mnt);
225         if (rc) {
226                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
227                        obd->obd_name, rc);
228                 GOTO(err_ns, rc);
229         }
230
231         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
232         if (rc)
233                 GOTO(err_fs, rc);
234
235         /* No recovery for MGC's */
236         obd->obd_replayable = 0;
237
238         /* Internal mgs setup */
239         mgs_init_fsdb_list(obd);
240         sema_init(&mgs->mgs_sem, 1);
241
242         /* Start the service threads */
243         mgs->mgs_service =
244                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
245                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
246                                 MGC_REPLY_PORTAL, 2000,
247                                 mgs_handle, LUSTRE_MGS_NAME,
248                                 obd->obd_proc_entry, target_print_req,
249                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
250                                 "ll_mgs", LCT_MD_THREAD);
251
252         if (!mgs->mgs_service) {
253                 CERROR("failed to start service\n");
254                 GOTO(err_llog, rc = -ENOMEM);
255         }
256
257         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
258         if (rc)
259                 GOTO(err_thread, rc);
260
261         /* Setup proc */
262         lprocfs_mgs_init_vars(&lvars);
263         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
264                 lproc_mgs_setup(obd);
265         }
266
267         ping_evictor_start();
268
269         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
270
271         RETURN(0);
272
273 err_thread:
274         ptlrpc_unregister_service(mgs->mgs_service);
275 err_llog:
276         obd_llog_finish(obd, 0);
277 err_fs:
278         /* No extra cleanup needed for llog_init_commit_thread() */
279         mgs_fs_cleanup(obd);
280 err_ns:
281         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
282         obd->obd_namespace = NULL;
283 err_ops:
284         fsfilt_put_ops(obd->obd_fsops);
285 err_put:
286         server_put_mount(obd->obd_name, mnt);
287         mgs->mgs_sb = 0;
288         return rc;
289 }
290
291 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
292 {
293         int rc = 0;
294         ENTRY;
295
296         switch (stage) {
297         case OBD_CLEANUP_EARLY:
298         case OBD_CLEANUP_EXPORTS:
299                 rc = obd_llog_finish(obd, 0);
300                 break;
301         }
302         RETURN(rc);
303 }
304
305 /**
306  * Performs cleanup procedures for passed \a obd given it is mgs obd.
307  */
308 static int mgs_cleanup(struct obd_device *obd)
309 {
310         struct mgs_obd *mgs = &obd->u.mgs;
311         ENTRY;
312
313         if (mgs->mgs_sb == NULL)
314                 RETURN(0);
315
316         ping_evictor_stop();
317
318         ptlrpc_unregister_service(mgs->mgs_service);
319
320         mgs_cleanup_fsdb_list(obd);
321         lproc_mgs_cleanup(obd);
322         mgs_fs_cleanup(obd);
323
324         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
325         mgs->mgs_sb = NULL;
326
327         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
328         obd->obd_namespace = NULL;
329
330         fsfilt_put_ops(obd->obd_fsops);
331
332         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
333         RETURN(0);
334 }
335
336 /* similar to filter_prepare_destroy */
337 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
338                             struct lustre_handle *lockh)
339 {
340         struct ldlm_res_id res_id;
341         int rc, flags = 0;
342         ENTRY;
343
344         rc = mgc_fsname2resid(fsname, &res_id);
345         if (!rc)
346                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
347                                             LDLM_PLAIN, NULL, LCK_EX,
348                                             &flags, ldlm_blocking_ast,
349                                             ldlm_completion_ast, NULL,
350                                             fsname, 0, NULL, lockh);
351         if (rc)
352                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
353
354         RETURN(rc);
355 }
356
357 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
358 {
359         ENTRY;
360         ldlm_lock_decref(lockh, LCK_EX);
361         RETURN(0);
362 }
363
364 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
365                             struct lustre_handle *lockh)
366 {
367         int lockrc;
368
369         if (fsname[0]) {
370                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
371                 if (lockrc != ELDLM_OK)
372                         CERROR("lock error %d for fs %s\n", lockrc,
373                                fsname);
374                 else
375                         mgs_put_cfg_lock(lockh);
376         }
377 }
378
379 /* rc=0 means ok
380       1 means update
381      <0 means error */
382 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
383 {
384         int rc;
385         ENTRY;
386
387         rc = mgs_check_index(obd, mti);
388         if (rc == 0) {
389                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
390                                    "this MGS does not know about it. Assuming"
391                                    " writeconf.\n", mti->mti_svname);
392                 mti->mti_flags |= LDD_F_WRITECONF;
393                 rc = 1;
394         } else if (rc == -1) {
395                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
396                                    "disappeared! Regenerating all logs.\n",
397                                    mti->mti_fsname);
398                 mti->mti_flags |= LDD_F_WRITECONF;
399                 rc = 1;
400         } else {
401                 /* Index is correctly marked as used */
402
403                 /* If the logs don't contain the mti_nids then add
404                    them as failover nids */
405                 rc = mgs_check_failnid(obd, mti);
406         }
407
408         RETURN(rc);
409 }
410
411 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
412 static int mgs_handle_target_reg(struct ptlrpc_request *req)
413 {
414         struct obd_device *obd = req->rq_export->exp_obd;
415         struct lustre_handle lockh;
416         struct mgs_target_info *mti, *rep_mti;
417         int rc = 0, lockrc;
418         ENTRY;
419
420         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
421
422         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
423         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
424                                 LDD_F_UPDATE))) {
425                 /* We're just here as a startup ping. */
426                 CDEBUG(D_MGS, "Server %s is running on %s\n",
427                        mti->mti_svname, obd_export_nid2str(req->rq_export));
428                 rc = mgs_check_target(obd, mti);
429                 /* above will set appropriate mti flags */
430                 if (rc <= 0)
431                         /* Nothing wrong, or fatal error */
432                         GOTO(out_nolock, rc);
433         }
434
435         /* Revoke the config lock to make sure nobody is reading. */
436         /* Although actually I think it should be alright if
437            someone was reading while we were updating the logs - if we
438            revoke at the end they will just update from where they left off. */
439         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
440         if (lockrc != ELDLM_OK) {
441                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
442                                    "update their configuration (%d). Updating "
443                                    "local logs anyhow; you might have to "
444                                    "manually restart other nodes to get the "
445                                    "latest configuration.\n",
446                                    obd->obd_name, lockrc);
447         }
448
449         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
450
451         /* Log writing contention is handled by the fsdb_sem */
452
453         if (mti->mti_flags & LDD_F_WRITECONF) {
454                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
455                     mti->mti_stripe_index == 0) {
456                         rc = mgs_erase_logs(obd, mti->mti_fsname);
457                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
458                                       "request.  All servers must be restarted "
459                                       "in order to regenerate the logs."
460                                       "\n", obd->obd_name, mti->mti_fsname);
461                 } else if (mti->mti_flags &
462                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
463                         rc = mgs_erase_log(obd, mti->mti_svname);
464                         LCONSOLE_WARN("%s: Regenerating %s log by user "
465                                       "request.\n",
466                                       obd->obd_name, mti->mti_svname);
467                 }
468                 mti->mti_flags |= LDD_F_UPDATE;
469                 /* Erased logs means start from scratch. */
470                 mti->mti_flags &= ~LDD_F_UPGRADE14;
471         }
472
473         /* COMPAT_146 */
474         if (mti->mti_flags & LDD_F_UPGRADE14) {
475                 rc = mgs_upgrade_sv_14(obd, mti);
476                 if (rc) {
477                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
478                         GOTO(out, rc);
479                 }
480
481                 /* We're good to go */
482                 mti->mti_flags |= LDD_F_UPDATE;
483         }
484         /* end COMPAT_146 */
485
486         if (mti->mti_flags & LDD_F_UPDATE) {
487                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
488                        mti->mti_stripe_index);
489
490                 /* create or update the target log
491                    and update the client/mdt logs */
492                 rc = mgs_write_log_target(obd, mti);
493                 if (rc) {
494                         CERROR("Failed to write %s log (%d)\n",
495                                mti->mti_svname, rc);
496                         GOTO(out, rc);
497                 }
498
499                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
500                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
501                                     LDD_F_UPGRADE14);
502                 mti->mti_flags |= LDD_F_REWRITE_LDD;
503         }
504
505 out:
506         /* done with log update */
507         if (lockrc == ELDLM_OK)
508                 mgs_put_cfg_lock(&lockh);
509 out_nolock:
510         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
511                mti->mti_stripe_index, rc);
512         rc = req_capsule_server_pack(&req->rq_pill);
513         if (rc)
514                 RETURN(rc);
515
516         /* send back the whole mti in the reply */
517         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
518         *rep_mti = *mti;
519
520         /* Flush logs to disk */
521         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
522         RETURN(rc);
523 }
524
525 static int mgs_set_info_rpc(struct ptlrpc_request *req)
526 {
527         struct obd_device *obd = req->rq_export->exp_obd;
528         struct mgs_send_param *msp, *rep_msp;
529         struct lustre_handle lockh;
530         int rc;
531         struct lustre_cfg_bufs bufs;
532         struct lustre_cfg *lcfg;
533         char fsname[MTI_NAME_MAXLEN];
534         ENTRY;
535
536         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
537         LASSERT(msp);
538
539         /* Construct lustre_cfg structure to pass to function mgs_setparam */
540         lustre_cfg_bufs_reset(&bufs, NULL);
541         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
542         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
543         rc = mgs_setparam(obd, lcfg, fsname);
544         if (rc) {
545                 CERROR("Error %d in setting the parameter %s for fs %s\n",
546                        rc, msp->mgs_param, fsname);
547                 RETURN(rc);
548         }
549
550         /* request for update */
551         mgs_revoke_lock(obd, fsname, &lockh);
552
553         lustre_cfg_free(lcfg);
554
555         rc = req_capsule_server_pack(&req->rq_pill);
556         if (rc == 0) {
557                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
558                 rep_msp = msp;
559         }
560         RETURN(rc);
561 }
562
563 /* Called whenever a target cleans up. */
564 /* XXX - Currently unused */
565 static int mgs_handle_target_del(struct ptlrpc_request *req)
566 {
567         ENTRY;
568         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
569         RETURN(0);
570 }
571
572 /* XXX - Currently unused */
573 static int mgs_handle_exception(struct ptlrpc_request *req)
574 {
575         ENTRY;
576         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
577         RETURN(0);
578 }
579
580 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
581 int mgs_handle(struct ptlrpc_request *req)
582 {
583         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
584         int opc, rc = 0;
585         ENTRY;
586
587         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
588         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
589         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
590                 RETURN(0);
591
592         LASSERT(current->journal_info == NULL);
593         opc = lustre_msg_get_opc(req->rq_reqmsg);
594         if (opc != MGS_CONNECT) {
595                 if (req->rq_export == NULL) {
596                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
597                                opc);
598                         req->rq_status = -ENOTCONN;
599                         GOTO(out, rc = -ENOTCONN);
600                 }
601         }
602
603         switch (opc) {
604         case MGS_CONNECT:
605                 DEBUG_REQ(D_MGS, req, "connect");
606                 /* MGS and MDS have same request format for connect */
607                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
608                 rc = target_handle_connect(req);
609                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
610                         /* Make clients trying to reconnect after a MGS restart
611                            happy; also requires obd_replayable */
612                         lustre_msg_add_op_flags(req->rq_repmsg,
613                                                 MSG_CONNECT_RECONNECT);
614                 break;
615         case MGS_DISCONNECT:
616                 DEBUG_REQ(D_MGS, req, "disconnect");
617                 /* MGS and MDS have same request format for disconnect */
618                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
619                 rc = target_handle_disconnect(req);
620                 req->rq_status = rc;            /* superfluous? */
621                 break;
622         case MGS_EXCEPTION:
623                 DEBUG_REQ(D_MGS, req, "exception");
624                 rc = mgs_handle_exception(req);
625                 break;
626         case MGS_TARGET_REG:
627                 DEBUG_REQ(D_MGS, req, "target add");
628                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
629                 rc = mgs_handle_target_reg(req);
630                 break;
631         case MGS_TARGET_DEL:
632                 DEBUG_REQ(D_MGS, req, "target del");
633                 rc = mgs_handle_target_del(req);
634                 break;
635         case MGS_SET_INFO:
636                 DEBUG_REQ(D_MGS, req, "set_info");
637                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
638                 rc = mgs_set_info_rpc(req);
639                 break;
640
641         case LDLM_ENQUEUE:
642                 DEBUG_REQ(D_MGS, req, "enqueue");
643                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
644                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
645                                          ldlm_server_blocking_ast, NULL);
646                 break;
647         case LDLM_BL_CALLBACK:
648         case LDLM_CP_CALLBACK:
649                 DEBUG_REQ(D_MGS, req, "callback");
650                 CERROR("callbacks should not happen on MGS\n");
651                 LBUG();
652                 break;
653
654         case OBD_PING:
655                 DEBUG_REQ(D_INFO, req, "ping");
656                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
657                 rc = target_handle_ping(req);
658                 break;
659         case OBD_LOG_CANCEL:
660                 DEBUG_REQ(D_MGS, req, "log cancel");
661                 rc = -ENOTSUPP; /* la la la */
662                 break;
663
664         case LLOG_ORIGIN_HANDLE_CREATE:
665                 DEBUG_REQ(D_MGS, req, "llog_init");
666                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
667                 rc = llog_origin_handle_create(req);
668                 break;
669         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
670                 DEBUG_REQ(D_MGS, req, "llog next block");
671                 req_capsule_set(&req->rq_pill,
672                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
673                 rc = llog_origin_handle_next_block(req);
674                 break;
675         case LLOG_ORIGIN_HANDLE_READ_HEADER:
676                 DEBUG_REQ(D_MGS, req, "llog read header");
677                 req_capsule_set(&req->rq_pill,
678                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
679                 rc = llog_origin_handle_read_header(req);
680                 break;
681         case LLOG_ORIGIN_HANDLE_CLOSE:
682                 DEBUG_REQ(D_MGS, req, "llog close");
683                 rc = llog_origin_handle_close(req);
684                 break;
685         case LLOG_CATINFO:
686                 DEBUG_REQ(D_MGS, req, "llog catinfo");
687                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
688                 rc = llog_catinfo(req);
689                 break;
690         default:
691                 req->rq_status = -ENOTSUPP;
692                 rc = ptlrpc_error(req);
693                 RETURN(rc);
694         }
695
696         LASSERT(current->journal_info == NULL);
697
698         if (rc)
699                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
700
701 out:
702         target_send_reply(req, rc, fail);
703         RETURN(0);
704 }
705
706 static inline int mgs_init_export(struct obd_export *exp)
707 {
708         return ldlm_init_export(exp);
709 }
710
711 static inline int mgs_destroy_export(struct obd_export *exp)
712 {
713         ENTRY;
714
715         target_destroy_export(exp);
716         mgs_client_free(exp);
717         ldlm_destroy_export(exp);
718
719         RETURN(0);
720 }
721
722 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
723 {
724         char *ptr;
725
726         ENTRY;
727         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
728                 *fsname = *ptr;
729                 fsname++;
730         }
731         if (*ptr == '\0')
732                 return -EINVAL;
733         *fsname = '\0';
734         ptr++;
735         strcpy(poolname, ptr);
736
737         RETURN(0);
738 }
739
740 static int mgs_iocontrol_pool(struct obd_device *obd,
741                               struct obd_ioctl_data *data)
742 {
743         int rc;
744         struct lustre_handle lockh;
745         struct lustre_cfg *lcfg = NULL;
746         struct llog_rec_hdr rec;
747         char *fsname = NULL;
748         char *poolname = NULL;
749         ENTRY;
750
751         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
752         if (fsname == NULL)
753                 RETURN(-ENOMEM);
754
755         OBD_ALLOC(poolname, MAXPOOLNAME + 1);
756         if (poolname == NULL) {
757                 rc = -ENOMEM;
758                 GOTO(out_pool, rc);
759         }
760         rec.lrh_len = llog_data_len(data->ioc_plen1);
761
762         if (data->ioc_type == LUSTRE_CFG_TYPE) {
763                 rec.lrh_type = OBD_CFG_REC;
764         } else {
765                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
766                 rc = -EINVAL;
767                 GOTO(out_pool, rc);
768         }
769
770         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
771                 rc = -E2BIG;
772                 GOTO(out_pool, rc);
773         }
774
775         OBD_ALLOC(lcfg, data->ioc_plen1);
776         if (lcfg == NULL) {
777                 rc = -ENOMEM;
778                 GOTO(out_pool, rc);
779         }
780         rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
781         if (rc)
782                 GOTO(out_pool, rc);
783
784         if (lcfg->lcfg_bufcount < 2) {
785                 rc = -EINVAL;
786                 GOTO(out_pool, rc);
787         }
788
789         /* first arg is always <fsname>.<poolname> */
790         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
791                             poolname);
792
793         switch (lcfg->lcfg_command) {
794         case LCFG_POOL_NEW: {
795                 if (lcfg->lcfg_bufcount != 2)
796                         RETURN(-EINVAL);
797                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
798                                   poolname, NULL);
799                 break;
800         }
801         case LCFG_POOL_ADD: {
802                 if (lcfg->lcfg_bufcount != 3)
803                         RETURN(-EINVAL);
804                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
805                                   lustre_cfg_string(lcfg, 2));
806                 break;
807         }
808         case LCFG_POOL_REM: {
809                 if (lcfg->lcfg_bufcount != 3)
810                         RETURN(-EINVAL);
811                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
812                                   lustre_cfg_string(lcfg, 2));
813                 break;
814         }
815         case LCFG_POOL_DEL: {
816                 if (lcfg->lcfg_bufcount != 2)
817                         RETURN(-EINVAL);
818                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
819                                   poolname, NULL);
820                 break;
821         }
822         default: {
823                  rc = -EINVAL;
824                  GOTO(out_pool, rc);
825         }
826         }
827
828         if (rc) {
829                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
830                        rc, lcfg->lcfg_command, fsname, poolname);
831                 GOTO(out_pool, rc);
832         }
833
834         /* request for update */
835         mgs_revoke_lock(obd, fsname, &lockh);
836
837 out_pool:
838         if (lcfg != NULL)
839                 OBD_FREE(lcfg, data->ioc_plen1);
840
841         if (fsname != NULL)
842                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
843
844         if (poolname != NULL)
845                 OBD_FREE(poolname, MAXPOOLNAME + 1);
846
847         RETURN(rc);
848 }
849
850 /* from mdt_iocontrol */
851 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
852                   void *karg, void *uarg)
853 {
854         struct obd_device *obd = exp->exp_obd;
855         struct obd_ioctl_data *data = karg;
856         struct lvfs_run_ctxt saved;
857         int rc = 0;
858
859         ENTRY;
860         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
861
862         switch (cmd) {
863
864         case OBD_IOC_PARAM: {
865                 struct lustre_handle lockh;
866                 struct lustre_cfg *lcfg;
867                 struct llog_rec_hdr rec;
868                 char fsname[MTI_NAME_MAXLEN];
869
870                 rec.lrh_len = llog_data_len(data->ioc_plen1);
871
872                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
873                         rec.lrh_type = OBD_CFG_REC;
874                 } else {
875                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
876                         RETURN(-EINVAL);
877                 }
878
879                 OBD_ALLOC(lcfg, data->ioc_plen1);
880                 if (lcfg == NULL)
881                         RETURN(-ENOMEM);
882                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
883                 if (rc)
884                         GOTO(out_free, rc);
885
886                 if (lcfg->lcfg_bufcount < 1)
887                         GOTO(out_free, rc = -EINVAL);
888
889                 rc = mgs_setparam(obd, lcfg, fsname);
890                 if (rc) {
891                         CERROR("setparam err %d\n", rc);
892                         GOTO(out_free, rc);
893                 }
894
895                 /* Revoke lock so everyone updates.  Should be alright if
896                    someone was already reading while we were updating the logs,
897                    so we don't really need to hold the lock while we're
898                    writing (above). */
899                 mgs_revoke_lock(obd, fsname, &lockh);
900
901 out_free:
902                 OBD_FREE(lcfg, data->ioc_plen1);
903                 RETURN(rc);
904         }
905
906         case OBD_IOC_POOL: {
907                 RETURN(mgs_iocontrol_pool(obd, data));
908         }
909
910         case OBD_IOC_DUMP_LOG: {
911                 struct llog_ctxt *ctxt;
912                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
913                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
914                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
915                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
916                 llog_ctxt_put(ctxt);
917
918                 RETURN(rc);
919         }
920
921         case OBD_IOC_LLOG_CHECK:
922         case OBD_IOC_LLOG_INFO:
923         case OBD_IOC_LLOG_PRINT: {
924                 struct llog_ctxt *ctxt;
925                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
926
927                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
928                 rc = llog_ioctl(ctxt, cmd, data);
929                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
930                 llog_ctxt_put(ctxt);
931
932                 RETURN(rc);
933         }
934
935         default:
936                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
937                 RETURN(-EINVAL);
938         }
939         RETURN(0);
940 }
941
942 /* use obd ops to offer management infrastructure */
943 static struct obd_ops mgs_obd_ops = {
944         .o_owner           = THIS_MODULE,
945         .o_connect         = mgs_connect,
946         .o_reconnect       = mgs_reconnect,
947         .o_disconnect      = mgs_disconnect,
948         .o_setup           = mgs_setup,
949         .o_precleanup      = mgs_precleanup,
950         .o_cleanup         = mgs_cleanup,
951         .o_init_export     = mgs_init_export,
952         .o_destroy_export  = mgs_destroy_export,
953         .o_iocontrol       = mgs_iocontrol,
954         .o_llog_init       = mgs_llog_init,
955         .o_llog_finish     = mgs_llog_finish
956 };
957
958 static int __init mgs_init(void)
959 {
960         struct lprocfs_static_vars lvars;
961
962         lprocfs_mgs_init_vars(&lvars);
963         class_register_type(&mgs_obd_ops, NULL,
964                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
965
966         return 0;
967 }
968
969 static void /*__exit*/ mgs_exit(void)
970 {
971         class_unregister_type(LUSTRE_MGS_NAME);
972 }
973
974 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
975 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
976 MODULE_LICENSE("GPL");
977
978 module_init(mgs_init);
979 module_exit(mgs_exit);