Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / mgc / mgc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgc/mgc_request.c
5  *  Lustre Management Client config llog handling
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  */
30  
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_MGC
35 #define D_MGC D_CONFIG|D_WARNING
36
37 #ifdef __KERNEL__
38 # include <linux/module.h>
39 # include <linux/pagemap.h>
40 # include <linux/miscdevice.h>
41 # include <linux/init.h>
42 #else
43 # include <liblustre.h>
44 #endif
45
46 #include <linux/obd_class.h>
47 #include <linux/lustre_dlm.h>
48 #include <linux/lustre_log.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/lustre_disk.h>
51
52
53 int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id)
54 {
55         char *name_end;
56         int len;
57         __u64 resname = 0;
58         
59         /* fsname is at most 8 chars long at the beginning of the logname
60            e.g. "lustre-MDT0001" or "lustre" */
61         name_end = strrchr(logname, '-');
62         if (name_end)
63                 len = name_end - logname;
64         else
65                 len = strlen(logname);
66         if (len > 8) {
67                 CERROR("fsname too long: %s\n", logname);
68                 return -EINVAL;
69         }
70         memcpy(&resname, logname, len);
71
72         memset(res_id, 0, sizeof(*res_id));
73         /* FIXME are resid names swabbed across the wire? */
74         res_id->name[0] = cpu_to_le64(resname);
75         CDEBUG(D_MGC, "log %s to resid "LPX64"/"LPX64" (%.8s)\n", logname,
76                res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
77         return 0;
78 }
79 EXPORT_SYMBOL(mgc_logname2resid);
80
81 /********************** config llog list **********************/
82 static struct list_head config_llog_list = LIST_HEAD_INIT(config_llog_list);
83 static spinlock_t       config_list_lock = SPIN_LOCK_UNLOCKED;
84
85 static int config_log_get(struct config_llog_data *cld)
86 {
87         ENTRY;
88         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
89                atomic_read(&cld->cld_refcount));
90         atomic_inc(&cld->cld_refcount);
91         if (cld->cld_stopping) {
92                 atomic_dec(&cld->cld_refcount);
93                 RETURN(1);
94         }
95         RETURN(0);
96 }
97
98 static void config_log_put(struct config_llog_data *cld)
99 {
100         ENTRY;
101         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
102                atomic_read(&cld->cld_refcount));
103         if (atomic_dec_and_test(&cld->cld_refcount)) {
104                 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
105                 OBD_FREE(cld->cld_logname, strlen(cld->cld_logname) + 1);
106                 if (cld->cld_cfg.cfg_instance != NULL)
107                         OBD_FREE(cld->cld_cfg.cfg_instance, 
108                                  strlen(cld->cld_cfg.cfg_instance) + 1);
109                 OBD_FREE(cld, sizeof(*cld));
110         }
111         EXIT;
112 }
113
114 static struct config_llog_data *config_log_find(char *logname, 
115                                                struct config_llog_instance *cfg)
116 {
117         struct list_head *tmp;
118         struct config_llog_data *cld;
119         char *logid = logname;
120         int match_instance = 0;
121         ENTRY;
122
123         if (cfg && cfg->cfg_instance) {
124                 match_instance++;
125                 logid = cfg->cfg_instance;
126         }
127         if (!logid) {
128                 CERROR("No log specified\n");
129                 RETURN(ERR_PTR(-EINVAL));
130         }
131
132         spin_lock(&config_list_lock);
133         list_for_each(tmp, &config_llog_list) {
134                 cld = list_entry(tmp, struct config_llog_data, cld_list_chain);
135                 if (match_instance && cld->cld_cfg.cfg_instance && 
136                     strcmp(logid, cld->cld_cfg.cfg_instance) == 0)
137                         goto out_found;
138                 if (!match_instance &&  
139                     strcmp(logid, cld->cld_logname) == 0)
140                         goto out_found;
141         }
142         spin_unlock(&config_list_lock);
143
144         CERROR("can't get log %s\n", logid);
145         RETURN(ERR_PTR(-ENOENT));
146 out_found:
147         atomic_inc(&cld->cld_refcount);
148         spin_unlock(&config_list_lock);
149         RETURN(cld);
150 }
151
152 /* Add this log to our list of active logs. 
153    We have one active log per "mount" - client instance or servername.
154    Each instance may be at a different point in the log. */
155 static int config_log_add(char *logname, struct config_llog_instance *cfg,
156                           struct super_block *sb)
157 {
158         struct config_llog_data *cld;
159         int rc;
160         ENTRY;
161
162         CDEBUG(D_MGC, "adding config log %s:%s\n", logname, cfg->cfg_instance);
163         
164         OBD_ALLOC(cld, sizeof(*cld));
165         if (!cld) 
166                 RETURN(-ENOMEM);
167         OBD_ALLOC(cld->cld_logname, strlen(logname) + 1);
168         if (!cld->cld_logname) { 
169                 OBD_FREE(cld, sizeof(*cld));
170                 RETURN(-ENOMEM);
171         }
172         strcpy(cld->cld_logname, logname);
173         cld->cld_cfg = *cfg;
174         cld->cld_cfg.cfg_last_idx = 0;
175         cld->cld_cfg.cfg_flags = 0;
176         cld->cld_cfg.cfg_sb = sb;
177         atomic_set(&cld->cld_refcount, 1);
178         if (cfg->cfg_instance != NULL) {
179                 OBD_ALLOC(cld->cld_cfg.cfg_instance, 
180                           strlen(cfg->cfg_instance) + 1);
181                 strcpy(cld->cld_cfg.cfg_instance, cfg->cfg_instance);
182         }
183         rc = mgc_logname2resid(logname, &cld->cld_resid);
184         if (rc) {
185                 config_log_put(cld);
186                 RETURN(rc);
187         }
188         spin_lock(&config_list_lock);
189         list_add(&cld->cld_list_chain, &config_llog_list);
190         spin_unlock(&config_list_lock);
191         
192         RETURN(rc);
193 }
194
195 /* Stop watching for updates on this log. */
196 static int config_log_end(char *logname, struct config_llog_instance *cfg)
197 {       
198         struct config_llog_data *cld;
199         int rc = 0;
200         ENTRY;
201                                        
202         cld = config_log_find(logname, cfg);
203         if (IS_ERR(cld)) 
204                 RETURN(PTR_ERR(cld));
205         /* drop the ref from the find */
206         config_log_put(cld);
207
208         cld->cld_stopping = 1;
209         spin_lock(&config_list_lock);
210         list_del(&cld->cld_list_chain);
211         spin_unlock(&config_list_lock);
212         /* drop the start ref */
213         config_log_put(cld);
214         CDEBUG(D_MGC, "end config log %s (%d)\n", logname, rc);
215         RETURN(rc);
216 }
217
218 /* Failsafe */
219 static void config_log_end_all(void)
220 {
221         struct list_head *tmp, *n;
222         struct config_llog_data *cld;
223         ENTRY;
224         
225         spin_lock(&config_list_lock);
226         list_for_each_safe(tmp, n, &config_llog_list) {
227                 cld = list_entry(tmp, struct config_llog_data, cld_list_chain);
228                 CERROR("conflog failsafe %s\n", cld->cld_logname);
229                 list_del(&cld->cld_list_chain);
230                 config_log_put(cld);
231         }
232         spin_unlock(&config_list_lock);
233         EXIT;
234 }
235
236
237 /********************** class fns **********************/
238
239 static int mgc_fs_setup(struct obd_device *obd, struct super_block *sb, 
240                         struct vfsmount *mnt)
241 {
242         struct lvfs_run_ctxt saved;
243         struct lustre_sb_info *lsi = s2lsi(sb);
244         struct client_obd *cli = &obd->u.cli;
245         struct dentry *dentry;
246         int err = 0;
247         ENTRY;
248
249         LASSERT(lsi);
250         LASSERT(lsi->lsi_srv_mnt == mnt);
251
252         /* The mgc fs exclusion sem. Only one fs can be setup at a time.
253            Maybe just overload the cl_sem? */
254         down(&cli->cl_mgc_sem);
255
256         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
257         if (IS_ERR(obd->obd_fsops)) {
258                 up(&cli->cl_mgc_sem);
259                 CERROR("No fstype %s rc=%ld\n", MT_STR(lsi->lsi_ldd), 
260                        PTR_ERR(obd->obd_fsops));
261                 RETURN(PTR_ERR(obd->obd_fsops));
262         }
263
264         cli->cl_mgc_vfsmnt = mnt;
265         // FIXME which is the right SB? - filter_common_setup also 
266         CDEBUG(D_MGC, "SB's: fill=%p mnt=%p root=%p\n", sb, mnt->mnt_sb,
267                mnt->mnt_root->d_inode->i_sb);
268         fsfilt_setup(obd, mnt->mnt_sb);
269
270         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
271         obd->obd_lvfs_ctxt.pwdmnt = mnt;
272         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
273         obd->obd_lvfs_ctxt.fs = get_ds();
274
275         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
276         dentry = lookup_one_len(MOUNT_CONFIGS_DIR, current->fs->pwd,
277                                 strlen(MOUNT_CONFIGS_DIR));
278         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
279         if (IS_ERR(dentry)) {
280                 err = PTR_ERR(dentry);
281                 CERROR("cannot lookup %s directory: rc = %d\n", 
282                        MOUNT_CONFIGS_DIR, err);
283                 GOTO(err_ops, err);
284         }
285         cli->cl_mgc_configs_dir = dentry;
286
287         /* We keep the cl_mgc_sem until mgc_fs_cleanup */
288         RETURN(0);
289
290 err_ops:        
291         fsfilt_put_ops(obd->obd_fsops);
292         obd->obd_fsops = NULL;
293         cli->cl_mgc_vfsmnt = NULL;
294         up(&cli->cl_mgc_sem);
295         RETURN(err);
296 }
297
298 static int mgc_fs_cleanup(struct obd_device *obd)
299 {
300         struct client_obd *cli = &obd->u.cli;
301         int rc = 0;
302         ENTRY;
303
304         LASSERT(cli->cl_mgc_vfsmnt != NULL);
305
306         if (cli->cl_mgc_configs_dir != NULL) {
307                 struct lvfs_run_ctxt saved;
308                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
309                 l_dput(cli->cl_mgc_configs_dir);
310                 cli->cl_mgc_configs_dir = NULL; 
311                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
312         }
313
314         cli->cl_mgc_vfsmnt = NULL;
315         if (obd->obd_fsops) 
316                 fsfilt_put_ops(obd->obd_fsops);
317         
318         up(&cli->cl_mgc_sem);
319         RETURN(rc);
320 }
321
322 static int mgc_cleanup(struct obd_device *obd)
323 {
324         struct client_obd *cli = &obd->u.cli;
325         int rc;
326
327         /* FIXME calls to mgc_fs_setup must take an obd ref to insure there's
328            no fs by the time we get here. */
329         LASSERT(cli->cl_mgc_vfsmnt == NULL);
330         
331         rc = obd_llog_finish(obd, 0);
332         if (rc != 0)
333                 CERROR("failed to cleanup llogging subsystems\n");
334
335         ptlrpcd_decref();
336
337         config_log_end_all();
338
339         return client_obd_cleanup(obd);
340 }
341
342 static struct obd_device *the_mgc;
343
344 static int mgc_setup(struct obd_device *obd, obd_count len, void *buf)
345 {
346         int rc;
347         ENTRY;
348
349         ptlrpcd_addref();
350
351         rc = client_obd_setup(obd, len, buf);
352         if (rc)
353                 GOTO(err_decref, rc);
354
355         rc = obd_llog_init(obd, obd, 0, NULL);
356         if (rc) {
357                 CERROR("failed to setup llogging subsystems\n");
358                 GOTO(err_cleanup, rc);
359         }
360
361         the_mgc = obd;
362         RETURN(rc);
363
364 err_cleanup:
365         client_obd_cleanup(obd);
366 err_decref:
367         ptlrpcd_decref();
368         RETURN(rc);
369 }
370
371 static int mgc_process_log(struct obd_device *mgc, 
372                            struct config_llog_data *cld);
373
374 /* FIXME I don't want a thread for every cld; make a list of cld's to requeue
375    and use only 1 thread. */
376 /* reenqueue the lock, reparse the log */
377 static int mgc_async_requeue(void *data)
378 {
379         wait_queue_head_t   waitq;
380         struct l_wait_info  lwi;
381         struct config_llog_data *cld = (struct config_llog_data *)data;
382         unsigned long flags;
383         int rc = 0;
384         ENTRY;
385
386         if (!data) 
387                 RETURN(-EINVAL);
388         if (cld->cld_stopping) 
389                 GOTO(out, rc = 0);
390
391         lock_kernel();
392         ptlrpc_daemonize();
393         SIGNAL_MASK_LOCK(current, flags);
394         sigfillset(&current->blocked);
395         RECALC_SIGPENDING;
396         SIGNAL_MASK_UNLOCK(current, flags);
397         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "reQ %s", 
398                     cld->cld_logname);
399         unlock_kernel();
400
401         CDEBUG(D_MGC, "requeue "LPX64" %s:%s\n", 
402                cld->cld_resid.name[0], cld->cld_logname, 
403                cld->cld_cfg.cfg_instance);
404         
405         /* Sleep a few seconds to allow the server who caused
406            the lock revocation to finish its setup, plus some random
407            so everyone doesn't try to reconnect at once. */
408         init_waitqueue_head(&waitq);
409         lwi = LWI_TIMEOUT(3 * HZ + (ll_rand() & 0x7f), NULL, NULL);
410         l_wait_event(waitq, 0, &lwi);
411
412         LASSERT(the_mgc);
413
414         class_export_get(the_mgc->obd_self_export);
415 #if 0
416         /* Re-send server info every time, in case MGS needs to regen its
417            logs (for write_conf).  Do we need this?  It's extra RPCs for
418            every server at every update.  Turning it off until I'm sure
419            it's needed. */
420         server_register_target(cld->cld_cfg.cfg_sb);
421 #endif 
422         rc = mgc_process_log(the_mgc, cld);
423         class_export_put(the_mgc->obd_self_export);
424 out:
425         /* Whether we enqueued again or not in mgc_process_log, 
426            we're done with the ref from the old mgc_blocking_ast */        
427         config_log_put(cld);                                                    
428
429         RETURN(rc);
430 }
431
432 /* based on ll_mdc_blocking_ast */
433 static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
434                             void *data, int flag)
435 {
436         struct lustre_handle lockh;
437         struct config_llog_data *cld = (struct config_llog_data *)data;
438         int rc = 0;
439         ENTRY;
440
441         switch (flag) {
442         case LDLM_CB_BLOCKING:
443                 /* mgs wants the lock, give it up... */
444                 LDLM_DEBUG(lock, "MGC blocking CB");
445                 ldlm_lock2handle(lock, &lockh);
446                 rc = ldlm_cli_cancel(&lockh);
447                 break;
448         case LDLM_CB_CANCELING: {
449                 /* We've given up the lock, prepare ourselves to update. */
450                 LDLM_DEBUG(lock, "MGC cancel CB");
451                 
452                 CDEBUG(D_MGC, "Lock res "LPX64" (%.8s)\n",
453                        lock->l_resource->lr_name.name[0], 
454                        (char *)&lock->l_resource->lr_name.name[0]);
455
456                 /* Make sure not to re-enqueue when the mgc is stopping
457                    (we get called from client_disconnect_export) */
458                 if (!lock->l_conn_export ||
459                     !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
460                         CDEBUG(D_MGC, "Disconnecting, don't requeue\n");
461                         goto out_drop;
462                 }
463                 if (lock->l_req_mode != lock->l_granted_mode) {
464                         CERROR("original grant failed, won't requeue\n");
465                         goto out_drop;
466                 }
467                 if (!data) {
468                         CERROR("missing data, won't requeue\n");
469                         goto out_drop;
470                 }
471                 if (cld->cld_stopping) {
472                         CERROR("stopping, won't requeue\n");
473                         goto out_drop;
474                 }
475
476                 /* Re-enqueue the lock in a separate thread, because we must
477                    return from this fn before that lock can be taken. */
478                 rc = kernel_thread(mgc_async_requeue, data,
479                                    CLONE_VM | CLONE_FS);
480                 if (rc < 0) {
481                         CERROR("Cannot re-enqueue thread: %d\n", rc);
482                 } else {
483                         rc = 0;
484                         break;
485                 }
486 out_drop:
487                 /* Drop this here or in mgc_async_requeue,
488                    in either case, we're done with the reference
489                    after this. */
490                 config_log_put(cld);    
491                 break;
492         }
493         default:
494                 LBUG();
495         }
496
497
498         if (rc) {
499                 CERROR("%s CB failed %d:\n", flag == LDLM_CB_BLOCKING ? 
500                        "blocking" : "cancel", rc);
501                 LDLM_ERROR(lock, "MGC ast");
502         }
503         RETURN(rc);
504 }
505
506 /* Take a config lock so we can get cancel notifications */
507 static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
508                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
509                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
510                        void *data, __u32 lvb_len, void *lvb_swabber,
511                        struct lustre_handle *lockh)
512 {                       
513         struct config_llog_data *cld = (struct config_llog_data *)data;
514         struct obd_device *obd = class_exp2obd(exp);
515         int rc;
516         ENTRY;
517
518         CDEBUG(D_MGC, "Enqueue for %s (res "LPX64")\n", cld->cld_logname,
519                cld->cld_resid.name[0]);
520                 
521         /* We can only drop this config log ref when we drop the lock */
522         if (config_log_get(cld))
523                 RETURN(ELDLM_LOCK_ABORTED);
524
525         /* We need a callback for every lockholder, so don't try to
526            ldlm_lock_match (see rev 1.1.2.11.2.47) */
527
528         rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, cld->cld_resid,
529                               type, NULL, mode, flags, 
530                               mgc_blocking_ast, ldlm_completion_ast, NULL,
531                               data, NULL, 0, NULL, lockh);
532
533         RETURN(rc);
534 }
535
536 static int mgc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
537                       __u32 mode, struct lustre_handle *lockh)
538 {
539         ENTRY;
540
541         ldlm_lock_decref(lockh, mode);
542
543         RETURN(0);
544 }
545
546 #if 0
547 static int mgc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
548                          void *karg, void *uarg)
549 {
550         struct obd_device *obd = exp->exp_obd;
551         struct obd_ioctl_data *data = karg;
552         struct llog_ctxt *ctxt;
553         struct lvfs_run_ctxt saved;
554         int rc;
555         ENTRY;
556
557 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
558         MOD_INC_USE_COUNT;
559 #else
560         if (!try_module_get(THIS_MODULE)) {
561                 CERROR("Can't get module. Is it alive?");
562                 return -EINVAL;
563         }
564 #endif
565         switch (cmd) {
566         /* REPLicator context */
567         case OBD_IOC_PARSE: {
568                 CERROR("MGC parsing llog %s\n", data->ioc_inlbuf1);
569                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
570                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
571                 GOTO(out, rc);
572         }
573 #ifdef __KERNEL__
574         case OBD_IOC_LLOG_INFO:
575         case OBD_IOC_LLOG_PRINT: {
576                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
577                 rc = llog_ioctl(ctxt, cmd, data);
578
579                 GOTO(out, rc);
580         }
581 #endif
582         /* ORIGinator context */
583         case OBD_IOC_DUMP_LOG: {
584                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
585                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
586                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
587                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
588                 if (rc)
589                         RETURN(rc);
590
591                 GOTO(out, rc);
592         }
593         default:
594                 CERROR("mgc_ioctl(): unrecognised ioctl %#x\n", cmd);
595                 GOTO(out, rc = -ENOTTY);
596         }
597 out:
598 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
599         MOD_DEC_USE_COUNT;
600 #else
601         module_put(THIS_MODULE);
602 #endif
603
604         return rc;
605 }
606 #endif
607
608 /* Send target_reg message to MGS */
609 static int mgc_target_register(struct obd_export *exp,
610                                struct mgs_target_info *mti)
611 {
612         struct ptlrpc_request *req;
613         struct mgs_target_info *req_mti, *rep_mti;
614         int size = sizeof(*req_mti);
615         int rep_size = sizeof(*mti);
616         int rc;
617         ENTRY;
618
619         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MGS_VERSION,
620                               MGS_TARGET_REG, 1, &size, NULL);
621         if (!req)
622                 RETURN(-ENOMEM);
623
624         req_mti = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_mti));
625         if (!req_mti) 
626                 RETURN(-ENOMEM);
627         memcpy(req_mti, mti, sizeof(*req_mti));
628
629         req->rq_replen = lustre_msg_size(1, &rep_size);
630
631         CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
632         
633         rc = ptlrpc_queue_wait(req);
634         if (!rc) {
635                 rep_mti = lustre_swab_repbuf(req, 0, sizeof(*rep_mti),
636                                              lustre_swab_mgs_target_info);
637                 memcpy(mti, rep_mti, sizeof(*rep_mti));
638                 CDEBUG(D_MGC, "register %s got index = %d\n",
639                        mti->mti_svname, mti->mti_stripe_index);
640         } else {
641                 CERROR("register failed. rc=%d\n", rc);
642         }
643         ptlrpc_req_finished(req);
644
645         RETURN(rc);
646 }
647
648 int mgc_set_info(struct obd_export *exp, obd_count keylen,
649                  void *key, obd_count vallen, void *val)
650 {
651         struct obd_import *imp = class_exp2cliimp(exp);
652         int rc = -EINVAL;
653         ENTRY;
654
655         /* Try to "recover" the initial connection; i.e. retry */
656         if (KEY_IS(KEY_INIT_RECOV)) {
657                 if (vallen != sizeof(int))
658                         RETURN(-EINVAL);
659                 imp->imp_initial_recov = *(int *)val;
660                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
661                        exp->exp_obd->obd_name, imp->imp_initial_recov);
662                 RETURN(0);
663         }
664         /* Turn off initial_recov after we try all backup servers once */
665         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
666                 if (vallen != sizeof(int))
667                         RETURN(-EINVAL);
668                 imp->imp_initial_recov_bk = *(int *)val;
669                 CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n",
670                        exp->exp_obd->obd_name, imp->imp_initial_recov_bk);
671                 if (imp->imp_invalid) {
672                         /* Resurrect if we previously died */
673                         CDEBUG(D_MGC, "Reactivate %s %d:%d:%d\n", 
674                                imp->imp_obd->obd_name,
675                                imp->imp_deactive, imp->imp_invalid, 
676                                imp->imp_state);
677                         /* can't put this in obdclass, module loop with ptlrpc*/
678                         /* This seems to be necessary when restarting a 
679                            combo mgs/mdt while the mgc is alive */
680                         ptlrpc_invalidate_import(imp);
681                         /* Remove 'invalid' flag */
682                         ptlrpc_activate_import(imp);
683                         /* Attempt a new connect */
684                         ptlrpc_recover_import(imp, NULL);
685                 }
686                 RETURN(0);
687         }
688         /* Hack alert */
689         if (KEY_IS("register_target")) {
690                 struct mgs_target_info *mti;
691                 if (vallen != sizeof(struct mgs_target_info))
692                         RETURN(-EINVAL);
693                 mti = (struct mgs_target_info *)val;
694                 CDEBUG(D_MGC, "register_target %s %#x\n",
695                        mti->mti_svname, mti->mti_flags);
696                 rc =  mgc_target_register(exp, mti);
697                 RETURN(rc);
698         }
699         if (KEY_IS("set_fs")) {
700                 struct super_block *sb = (struct super_block *)val;
701                 struct lustre_sb_info *lsi;
702                 if (vallen != sizeof(struct super_block))
703                         RETURN(-EINVAL);
704                 lsi = s2lsi(sb);
705                 rc = mgc_fs_setup(exp->exp_obd, sb, lsi->lsi_srv_mnt);
706                 if (rc) {
707                         CERROR("set_fs got %d\n", rc);
708                 }
709                 RETURN(rc);
710         }
711         if (KEY_IS("clear_fs")) {
712                 if (vallen != 0)
713                         RETURN(-EINVAL);
714                 rc = mgc_fs_cleanup(exp->exp_obd);
715                 if (rc) {
716                         CERROR("clear_fs got %d\n", rc);
717                 }
718                 RETURN(rc);
719         }
720
721         RETURN(rc);
722 }               
723
724 static int mgc_import_event(struct obd_device *obd,
725                             struct obd_import *imp,
726                             enum obd_import_event event)
727 {
728         int rc = 0;
729
730         LASSERT(imp->imp_obd == obd);
731         CDEBUG(D_MGC, "import event %#x\n", event);
732
733         switch (event) {
734         case IMP_EVENT_INVALIDATE: {
735                 struct ldlm_namespace *ns = obd->obd_namespace;
736
737                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
738
739                 break;
740         }
741         case IMP_EVENT_DISCON: 
742         case IMP_EVENT_INACTIVE: 
743         case IMP_EVENT_ACTIVE: 
744         case IMP_EVENT_OCD:
745                 break;
746         default:
747                 CERROR("Unknown import event %#x\n", event);
748                 LBUG();
749         }
750         RETURN(rc);
751 }
752
753 static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
754                          int count, struct llog_catid *logid)
755 {
756         struct llog_ctxt *ctxt;
757         int rc;
758         ENTRY;
759
760         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
761                         &llog_lvfs_ops);
762         if (rc)
763                 RETURN(rc);
764
765         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
766                         &llog_client_ops);
767         if (rc == 0) {
768                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
769                 ctxt->loc_imp = obd->u.cli.cl_import;
770         }
771
772         RETURN(rc);
773 }
774
775 static int mgc_llog_finish(struct obd_device *obd, int count)
776 {
777         int rc;
778         ENTRY;
779
780         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
781         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
782
783         RETURN(rc);
784 }
785
786 /* identical to mgs_log_is_empty */
787 static int mgc_llog_is_empty(struct obd_device *obd, struct llog_ctxt *ctxt,
788                             char *name)
789 {
790         struct lvfs_run_ctxt saved;
791         struct llog_handle *llh;
792         int rc = 0;
793
794         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
795         rc = llog_create(ctxt, &llh, NULL, name);
796         if (rc == 0) {
797                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
798                 rc = llog_get_size(llh);
799                 llog_close(llh);
800         }
801         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
802         /* header is record 1 */
803         return(rc <= 1);
804 }
805
806 static int mgc_copy_handler(struct llog_handle *llh, struct llog_rec_hdr *rec, 
807                             void *data)
808 {
809         struct llog_rec_hdr local_rec = *rec;
810         struct llog_handle *local_llh = (struct llog_handle *)data;
811         char *cfg_buf = (char*) (rec + 1);
812         struct lustre_cfg *lcfg;
813         int rc = 0;
814         ENTRY;
815
816         lcfg = (struct lustre_cfg *)cfg_buf;
817
818         /* FIXME we should always write to an empty log, so remove this check.*/
819         /* append new records */
820         if (rec->lrh_index >= llog_get_size(local_llh)) { 
821                 rc = llog_write_rec(local_llh, &local_rec, NULL, 0, 
822                                     (void *)cfg_buf, -1);
823
824                 CDEBUG(D_INFO, "idx=%d, rc=%d, len=%d, cmd %x %s %s\n", 
825                        rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command, 
826                        lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
827         } else {
828                 CDEBUG(D_INFO, "skip idx=%d\n",  rec->lrh_index);
829         }
830
831         RETURN(rc);
832 }
833
834 static int mgc_copy_llog(struct obd_device *obd, struct llog_ctxt *rctxt,
835                          struct llog_ctxt *lctxt, char *logname)
836 {
837         struct llog_handle *local_llh, *remote_llh;
838         struct obd_uuid *uuid;
839         int rc, rc2;
840         ENTRY;
841
842         /* open local log */
843         rc = llog_create(lctxt, &local_llh, NULL, logname);
844         if (rc)
845                 RETURN(rc);
846         /* set the log header uuid for fun */
847         OBD_ALLOC_PTR(uuid);
848         obd_str2uuid(uuid, logname);
849         rc = llog_init_handle(local_llh, LLOG_F_IS_PLAIN, uuid);
850         OBD_FREE_PTR(uuid);
851         if (rc)
852                 GOTO(out_closel, rc);
853
854         /* FIXME write new log to a temp name, then vfs_rename over logname
855            upon successful completion. */
856
857         /* open remote log */
858         rc = llog_create(rctxt, &remote_llh, NULL, logname);
859         if (rc)
860                 GOTO(out_closel, rc);
861         rc = llog_init_handle(remote_llh, LLOG_F_IS_PLAIN, NULL);
862         if (rc)
863                 GOTO(out_closer, rc);
864
865         rc = llog_process(remote_llh, mgc_copy_handler,(void *)local_llh, NULL);
866
867 out_closer:
868         rc2 = llog_close(remote_llh);
869         if (!rc)
870                 rc = rc2;
871 out_closel:
872         rc2 = llog_close(local_llh);
873         if (!rc)
874                 rc = rc2;
875
876         CDEBUG(D_MGC, "Copied remote log %s (%d)\n", logname, rc);
877         RETURN(rc);
878 }
879
880 DECLARE_MUTEX(llog_process_lock);
881
882 /* Get a config log from the MGS and process it.
883    This func is called for both clients and servers. */
884 static int mgc_process_log(struct obd_device *mgc, 
885                            struct config_llog_data *cld)
886 {
887         struct llog_ctxt *ctxt, *lctxt;
888         struct lustre_handle lockh;
889         struct client_obd *cli = &mgc->u.cli;
890         struct lvfs_run_ctxt saved;
891         struct lustre_sb_info *lsi;
892         int rc, rcl, flags = 0, must_pop = 0;
893         ENTRY;
894
895         if (!cld || !cld->cld_cfg.cfg_sb) {
896                 /* This should never happen */
897                 CERROR("Missing cld, aborting log update\n");
898                 RETURN(-EINVAL);
899         }
900         if (cld->cld_stopping) 
901                 RETURN(0);
902
903         lsi = s2lsi(cld->cld_cfg.cfg_sb);
904
905         CDEBUG(D_MGC, "Process log %s:%s from %d\n", cld->cld_logname, 
906                cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
907
908         ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
909         if (!ctxt) {
910                 CERROR("missing llog context\n");
911                 RETURN(-EINVAL);
912         }
913
914         /* I don't want mutliple processes running process_log at once -- 
915            sounds like badness.  It actually might be fine, as long as 
916            we're not trying to update from the same log
917            simultaneously (in which case we should use a per-log sem.) */
918         down(&llog_process_lock);
919
920         /* Get the cfg lock on the llog */
921         rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL, 
922                           LCK_CR, &flags, NULL, NULL, NULL, 
923                           cld, 0, NULL, &lockh);
924         if (rcl) 
925                 CERROR("Can't get cfg lock: %d\n", rcl);
926         
927         lctxt = llog_get_context(mgc, LLOG_CONFIG_ORIG_CTXT);
928
929         /* Copy the setup log locally if we can. Don't mess around if we're 
930            running an MGS though (logs are already local). */
931         if (lctxt && lsi && (lsi->lsi_flags & LSI_SERVER) && 
932             (lsi->lsi_srv_mnt == cli->cl_mgc_vfsmnt) &&
933             !IS_MGS(lsi->lsi_ldd)) {
934                 push_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
935                 must_pop++;
936                 if (rcl == 0) 
937                         /* Only try to copy log if we have the lock. */
938                         rc = mgc_copy_llog(mgc, ctxt, lctxt, cld->cld_logname);
939                 if (rcl || rc) {
940                         if (mgc_llog_is_empty(mgc, lctxt, cld->cld_logname)) {
941                                 LCONSOLE_ERROR("Failed to get MGS log %s "
942                                                "and no local copy.\n",
943                                                cld->cld_logname);
944                                 GOTO(out_pop, rc = -ENOTCONN);
945                         }
946                         LCONSOLE_WARN("Failed to get MGS log %s, using "
947                                       "local copy.\n", cld->cld_logname);
948                 }
949                 /* Now, whether we copied or not, start using the local llog.
950                    If we failed to copy, we'll start using whatever the old 
951                    log has. */
952                 ctxt = lctxt;
953         }
954
955         /* logname and instance info should be the same, so use our 
956            copy of the instance for the update.  The cfg_last_idx will
957            be updated here. */
958         rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
959         
960  out_pop:
961         if (must_pop) 
962                 pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
963
964         /* Now drop the lock so MGS can revoke it */ 
965         if (!rcl) {
966                 rcl = mgc_cancel(mgc->u.cli.cl_mgc_mgsexp, NULL, 
967                                  LCK_CR, &lockh);
968                 if (rcl) 
969                         CERROR("Can't drop cfg lock: %d\n", rcl);
970         }
971         
972         if (rc) {
973                 CERROR("%s: the configuration '%s' could not be read "
974                        "(%d) from the MGS.\n",
975                        mgc->obd_name, cld->cld_logname, rc);
976         }
977
978         up(&llog_process_lock);
979         
980         RETURN(rc);
981 }
982
983 static int mgc_process_config(struct obd_device *obd, obd_count len, void *buf)
984 {
985         struct lustre_cfg *lcfg = buf;
986         int cmd;
987         int rc = 0;
988         ENTRY;
989
990         switch(cmd = lcfg->lcfg_command) {
991         case LCFG_LOV_ADD_OBD: {
992                 struct mgs_target_info *mti;
993
994                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) != 
995                     sizeof(struct mgs_target_info))
996                         GOTO(out, rc = -EINVAL);
997
998                 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
999                 CDEBUG(D_MGC, "add_target %s %#x\n",    
1000                        mti->mti_svname, mti->mti_flags);
1001                 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1002                 break;
1003         }
1004         case LCFG_LOV_DEL_OBD: 
1005                 /* FIXME */
1006                 CERROR("lov_del_obd unimplemented\n");
1007                 rc = -ENOSYS;
1008                 break;
1009         case LCFG_LOG_START: {
1010                 struct config_llog_data *cld;
1011                 struct config_llog_instance *cfg;
1012                 struct super_block *sb;
1013                 char *logname = lustre_cfg_string(lcfg, 1);
1014                 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1015                 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1016                 
1017                 CDEBUG(D_MGC, "parse_log %s from %d\n", logname, 
1018                        cfg->cfg_last_idx);
1019
1020                 /* We're only called through here on the initial mount */
1021                 rc = config_log_add(logname, cfg, sb);
1022                 if (rc) 
1023                         break;
1024                 cld = config_log_find(logname, cfg);
1025                 if (IS_ERR(cld)) {
1026                         rc = PTR_ERR(cld);
1027                         break;
1028                 }
1029                 
1030                 /* COMPAT_146 */
1031                 /* For old logs, there was no start marker. */
1032                 /* FIXME only set this for old logs! */
1033                 cld->cld_cfg.cfg_flags |= CFG_F_MARKER;
1034                 
1035                 rc = mgc_process_log(obd, cld);
1036                 config_log_put(cld);
1037                 
1038                 break;       
1039         }
1040         case LCFG_LOG_END: {
1041                 struct config_llog_instance *cfg = NULL;
1042                 char *logname = lustre_cfg_string(lcfg, 1);
1043                 if (lcfg->lcfg_bufcount >= 2)
1044                         cfg = (struct config_llog_instance *)lustre_cfg_buf(
1045                                 lcfg, 2);
1046                 rc = config_log_end(logname, cfg);
1047                 break;
1048         }
1049         default: {
1050                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1051                 GOTO(out, rc = -EINVAL);
1052
1053         }
1054         }
1055 out:
1056         RETURN(rc);
1057 }
1058
1059 struct obd_ops mgc_obd_ops = {
1060         .o_owner        = THIS_MODULE,
1061         .o_setup        = mgc_setup,
1062         .o_cleanup      = mgc_cleanup,
1063         .o_add_conn     = client_import_add_conn,
1064         .o_del_conn     = client_import_del_conn,
1065         .o_connect      = client_connect_import,
1066         .o_disconnect   = client_disconnect_export,
1067         //.o_enqueue      = mgc_enqueue,
1068         .o_cancel       = mgc_cancel,
1069         //.o_iocontrol    = mgc_iocontrol,
1070         .o_set_info     = mgc_set_info,
1071         .o_import_event = mgc_import_event,
1072         .o_llog_init    = mgc_llog_init,
1073         .o_llog_finish  = mgc_llog_finish,
1074         .o_process_config = mgc_process_config,
1075 };
1076
1077 int __init mgc_init(void)
1078 {
1079         return class_register_type(&mgc_obd_ops, NULL, LUSTRE_MGC_NAME);
1080 }
1081
1082 #ifdef __KERNEL__
1083 static void /*__exit*/ mgc_exit(void)
1084 {
1085         class_unregister_type(LUSTRE_MGC_NAME);
1086 }
1087
1088 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1089 MODULE_DESCRIPTION("Lustre Management Client");
1090 MODULE_LICENSE("GPL");
1091
1092 module_init(mgc_init);
1093 module_exit(mgc_exit);
1094 #endif