Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  * These are the only exported functions, they provide some generic
25  * infrastructure for managing object devices
26  */
27
28 #define DEBUG_SUBSYSTEM S_CLASS
29 #ifdef __KERNEL__
30 #include <linux/kmod.h>   /* for request_module() */
31 #include <linux/module.h>
32 #else
33 #include <liblustre.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_ost.h>
37 #include <linux/obd_class.h>
38 #include <linux/lprocfs_status.h>
39
40 extern struct list_head obd_types;
41 static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED;
42
43 kmem_cache_t *obdo_cachep = NULL;
44 EXPORT_SYMBOL(obdo_cachep);
45 kmem_cache_t *import_cachep = NULL;
46
47 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
48 void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp);
49
50 /*
51  * support functions: we could use inter-module communication, but this
52  * is more portable to other OS's
53  */
54 struct obd_type *class_search_type(char *name)
55 {
56         struct list_head *tmp;
57         struct obd_type *type;
58
59         spin_lock(&obd_types_lock);
60         list_for_each(tmp, &obd_types) {
61                 type = list_entry(tmp, struct obd_type, typ_chain);
62                 if (strcmp(type->typ_name, name) == 0) {
63                         spin_unlock(&obd_types_lock);
64                         return type;
65                 }
66         }
67         spin_unlock(&obd_types_lock);
68         return NULL;
69 }
70
71 struct obd_type *class_get_type(char *name)
72 {
73         struct obd_type *type = class_search_type(name);
74
75 #ifdef CONFIG_KMOD
76         if (!type) {
77                 char *modname = name;
78                 if (strcmp(modname, LUSTRE_MDT_NAME) == 0) 
79                         modname = LUSTRE_MDS_NAME;
80                 if (!request_module(modname)) {
81                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
82                         type = class_search_type(name);
83                 } else {
84                         LCONSOLE_ERROR("Can't load module '%s'\n", modname);
85                 }
86         }
87 #endif
88         if (type)
89                 try_module_get(type->typ_ops->o_owner);
90         return type;
91 }
92
93 void class_put_type(struct obd_type *type)
94 {
95         LASSERT(type);
96         module_put(type->typ_ops->o_owner);
97 }
98
99 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
100                         char *name)
101 {
102         struct obd_type *type;
103         int rc = 0;
104         ENTRY;
105
106         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
107
108         if (class_search_type(name)) {
109                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
110                 RETURN(-EEXIST);
111         }
112
113         rc = -ENOMEM;
114         OBD_ALLOC(type, sizeof(*type));
115         if (type == NULL)
116                 RETURN(rc);
117
118         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
119         OBD_ALLOC(type->typ_name, strlen(name) + 1);
120         if (type->typ_ops == NULL || type->typ_name == NULL)
121                 GOTO (failed, rc);
122
123         *(type->typ_ops) = *ops;
124         strcpy(type->typ_name, name);
125
126 #ifdef LPROCFS
127         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
128                                               vars, type);
129         if (IS_ERR(type->typ_procroot)) {
130                 rc = PTR_ERR(type->typ_procroot);
131                 type->typ_procroot = NULL;
132                 GOTO (failed, rc);
133         }
134 #endif
135
136         spin_lock(&obd_types_lock);
137         list_add(&type->typ_chain, &obd_types);
138         spin_unlock(&obd_types_lock);
139
140         RETURN (0);
141
142  failed:
143         if (type->typ_name != NULL)
144                 OBD_FREE(type->typ_name, strlen(name) + 1);
145         if (type->typ_ops != NULL)
146                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
147         OBD_FREE(type, sizeof(*type));
148         RETURN(rc);
149 }
150
151 int class_unregister_type(char *name)
152 {
153         struct obd_type *type = class_search_type(name);
154         ENTRY;
155
156         if (!type) {
157                 CERROR("unknown obd type\n");
158                 RETURN(-EINVAL);
159         }
160
161         if (type->typ_refcnt) {
162                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
163                 /* This is a bad situation, let's make the best of it */
164                 /* Remove ops, but leave the name for debugging */
165                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
166                 RETURN(-EBUSY);
167         }
168
169         if (type->typ_procroot) {
170                 lprocfs_remove(type->typ_procroot);
171                 type->typ_procroot = NULL;
172         }
173
174         spin_lock(&obd_types_lock);
175         list_del(&type->typ_chain);
176         spin_unlock(&obd_types_lock);
177         OBD_FREE(type->typ_name, strlen(name) + 1);
178         if (type->typ_ops != NULL)
179                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
180         OBD_FREE(type, sizeof(*type));
181         RETURN(0);
182 } /* class_unregister_type */
183
184 struct obd_device *class_newdev(struct obd_type *type, char *name)
185 {
186         struct obd_device *result = NULL;
187         int i;
188
189         spin_lock(&obd_dev_lock);
190         for (i = 0 ; i < MAX_OBD_DEVICES; i++) {
191                 struct obd_device *obd = &obd_dev[i];
192                 if (obd->obd_name && (strcmp(name, obd->obd_name) == 0)) {
193                         CERROR("Device %s already exists, won't add\n", name);
194                         if (result) {
195                                 result->obd_type = NULL;
196                                 result->obd_name = NULL;
197                                 result = NULL;
198                         }
199                         break;
200                 }
201                 if (!result && !obd->obd_type) {
202                         LASSERT(obd->obd_minor == i);
203                         memset(obd, 0, sizeof(*obd));
204                         obd->obd_minor = i;
205                         obd->obd_type = type;
206                         obd->obd_name = name;
207                         CDEBUG(D_IOCTL, "Adding new device %s\n",
208                                obd->obd_name);
209                         result = obd;
210                 }
211         }
212         spin_unlock(&obd_dev_lock);
213         return result;
214 }
215
216 void class_release_dev(struct obd_device *obd)
217 {
218         int minor = obd->obd_minor;
219
220         spin_lock(&obd_dev_lock);
221         memset(obd, 0x5a, sizeof(*obd));
222         obd->obd_type = NULL;
223         obd->obd_minor = minor;
224         obd->obd_name = NULL;
225         spin_unlock(&obd_dev_lock);
226 }
227
228 int class_name2dev(char *name)
229 {
230         int i;
231
232         if (!name)
233                 return -1;
234
235         spin_lock(&obd_dev_lock);
236         for (i = 0; i < MAX_OBD_DEVICES; i++) {
237                 struct obd_device *obd = &obd_dev[i];
238                 if (obd->obd_name && strcmp(name, obd->obd_name) == 0) {
239                         /* Make sure we finished attaching before we give
240                            out any references */
241                         if (obd->obd_attached) {
242                                 spin_unlock(&obd_dev_lock);
243                                 return i;
244                         }
245                         break;
246                 }
247         }
248         spin_unlock(&obd_dev_lock);
249
250         return -1;
251 }
252
253 struct obd_device *class_name2obd(char *name)
254 {
255         int dev = class_name2dev(name);
256         if (dev < 0)
257                 return NULL;
258         return &obd_dev[dev];
259 }
260
261 int class_uuid2dev(struct obd_uuid *uuid)
262 {
263         int i;
264
265         spin_lock(&obd_dev_lock);
266         for (i = 0; i < MAX_OBD_DEVICES; i++) {
267                 struct obd_device *obd = &obd_dev[i];
268                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
269                         spin_unlock(&obd_dev_lock);
270                         return i;
271                 }
272         }
273         spin_unlock(&obd_dev_lock);
274
275         return -1;
276 }
277
278 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
279 {
280         int dev = class_uuid2dev(uuid);
281         if (dev < 0)
282                 return NULL;
283         return &obd_dev[dev];
284 }
285
286 void class_obd_list(void)
287 {
288         char *status;
289         int i;
290
291         spin_lock(&obd_dev_lock);
292         for (i = 0; i < MAX_OBD_DEVICES; i++) {
293                 struct obd_device *obd = &obd_dev[i];
294                 if (obd->obd_type == NULL)
295                         continue;
296                 if (obd->obd_stopping)
297                         status = "ST";
298                 else if (obd->obd_set_up)
299                         status = "UP";
300                 else if (obd->obd_attached)
301                         status = "AT";
302                 else
303                         status = "--";
304                 LCONSOLE(D_WARNING, "%3d %s %s %s %s %d\n",
305                          i, status, obd->obd_type->typ_name,
306                          obd->obd_name, obd->obd_uuid.uuid,
307                          atomic_read(&obd->obd_refcount));
308         }
309         spin_unlock(&obd_dev_lock);
310         return;
311 }
312
313 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
314    specified, then only the client with that uuid is returned,
315    otherwise any client connected to the tgt is returned. */
316 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
317                                           char * typ_name,
318                                           struct obd_uuid *grp_uuid)
319 {
320         int i;
321
322         spin_lock(&obd_dev_lock);
323         for (i = 0; i < MAX_OBD_DEVICES; i++) {
324                 struct obd_device *obd = &obd_dev[i];
325                 if (obd->obd_type == NULL)
326                         continue;
327                 if ((strncmp(obd->obd_type->typ_name, typ_name,
328                              strlen(typ_name)) == 0)) {
329                         struct client_obd *cli = &obd->u.cli;
330                         struct obd_import *imp = cli->cl_import;
331                         if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) &&
332                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
333                                                          &obd->obd_uuid) : 1)) {
334                                 spin_unlock(&obd_dev_lock);
335                                 return obd;
336                         }
337                 }
338         }
339         spin_unlock(&obd_dev_lock);
340
341         return NULL;
342 }
343
344 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
345                                             struct obd_uuid *grp_uuid)
346 {
347         struct obd_device *obd;
348
349         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
350         if (!obd)
351                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
352                                             grp_uuid);
353         return obd;
354 }
355
356 /* Iterate the obd_device list looking devices have grp_uuid. Start
357    searching at *next, and if a device is found, the next index to look
358    at is saved in *next. If next is NULL, then the first matching device
359    will always be returned. */
360 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
361 {
362         int i;
363
364         if (next == NULL)
365                 i = 0;
366         else if (*next >= 0 && *next < MAX_OBD_DEVICES)
367                 i = *next;
368         else
369                 return NULL;
370
371         spin_lock(&obd_dev_lock);
372         for (; i < MAX_OBD_DEVICES; i++) {
373                 struct obd_device *obd = &obd_dev[i];
374                 if (obd->obd_type == NULL)
375                         continue;
376                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
377                         if (next != NULL)
378                                 *next = i+1;
379                         spin_unlock(&obd_dev_lock);
380                         return obd;
381                 }
382         }
383         spin_unlock(&obd_dev_lock);
384
385         return NULL;
386 }
387
388
389 void obd_cleanup_caches(void)
390 {
391         ENTRY;
392         if (obdo_cachep) {
393                 LASSERTF(kmem_cache_destroy(obdo_cachep) == 0,
394                          "Cannot destory ll_obdo_cache\n");
395                 obdo_cachep = NULL;
396         }
397         if (import_cachep) {
398                 LASSERTF(kmem_cache_destroy(import_cachep) == 0,
399                          "Cannot destory ll_import_cache\n");
400                 import_cachep = NULL;
401         }
402         EXIT;
403 }
404
405 int obd_init_caches(void)
406 {
407         ENTRY;
408
409         LASSERT(obdo_cachep == NULL);
410         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
411                                         0, 0, NULL, NULL);
412         if (!obdo_cachep)
413                 GOTO(out, -ENOMEM);
414
415         LASSERT(import_cachep == NULL);
416         import_cachep = kmem_cache_create("ll_import_cache",
417                                           sizeof(struct obd_import),
418                                           0, 0, NULL, NULL);
419         if (!import_cachep)
420                 GOTO(out, -ENOMEM);
421
422         RETURN(0);
423  out:
424         obd_cleanup_caches();
425         RETURN(-ENOMEM);
426
427 }
428
429 /* map connection to client */
430 struct obd_export *class_conn2export(struct lustre_handle *conn)
431 {
432         struct obd_export *export;
433         ENTRY;
434
435         if (!conn) {
436                 CDEBUG(D_CACHE, "looking for null handle\n");
437                 RETURN(NULL);
438         }
439
440         if (conn->cookie == -1) {  /* this means assign a new connection */
441                 CDEBUG(D_CACHE, "want a new connection\n");
442                 RETURN(NULL);
443         }
444
445         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
446         export = class_handle2object(conn->cookie);
447         RETURN(export);
448 }
449
450 struct obd_device *class_exp2obd(struct obd_export *exp)
451 {
452         if (exp)
453                 return exp->exp_obd;
454         return NULL;
455 }
456
457 struct obd_device *class_conn2obd(struct lustre_handle *conn)
458 {
459         struct obd_export *export;
460         export = class_conn2export(conn);
461         if (export) {
462                 struct obd_device *obd = export->exp_obd;
463                 class_export_put(export);
464                 return obd;
465         }
466         return NULL;
467 }
468
469 struct obd_import *class_exp2cliimp(struct obd_export *exp)
470 {
471         struct obd_device *obd = exp->exp_obd;
472         if (obd == NULL)
473                 return NULL;
474         return obd->u.cli.cl_import;
475 }
476
477 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
478 {
479         struct obd_device *obd = class_conn2obd(conn);
480         if (obd == NULL)
481                 return NULL;
482         return obd->u.cli.cl_import;
483 }
484
485 /* Export management functions */
486 static void export_handle_addref(void *export)
487 {
488         class_export_get(export);
489 }
490
491 void __class_export_put(struct obd_export *exp)
492 {
493         if (atomic_dec_and_test(&exp->exp_refcount)) {
494                 struct obd_device *obd = exp->exp_obd;
495                 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
496                        exp->exp_client_uuid.uuid);
497
498                 LASSERT(obd != NULL);
499
500                 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
501                 if (exp->exp_connection)
502                         ptlrpc_put_connection_superhack(exp->exp_connection);
503
504                 LASSERT(list_empty(&exp->exp_outstanding_replies));
505                 LASSERT(list_empty(&exp->exp_handle.h_link));
506                 obd_destroy_export(exp);
507
508                 OBD_FREE(exp, sizeof(*exp));
509                 class_decref(obd);
510         }
511 }
512 EXPORT_SYMBOL(__class_export_put);
513
514 /* Creates a new export, adds it to the hash table, and returns a
515  * pointer to it. The refcount is 2: one for the hash reference, and
516  * one for the pointer returned by this function. */
517 struct obd_export *class_new_export(struct obd_device *obd)
518 {
519         struct obd_export *export;
520
521         OBD_ALLOC(export, sizeof(*export));
522         if (!export) {
523                 CERROR("no memory! (minor %d)\n", obd->obd_minor);
524                 return NULL;
525         }
526
527         export->exp_conn_cnt = 0;
528         atomic_set(&export->exp_refcount, 2);
529         export->exp_obd = obd;
530         INIT_LIST_HEAD(&export->exp_outstanding_replies);
531         /* XXX this should be in LDLM init */
532         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
533
534         INIT_LIST_HEAD(&export->exp_handle.h_link);
535         class_handle_hash(&export->exp_handle, export_handle_addref);
536         export->exp_last_request_time = CURRENT_SECONDS;
537         spin_lock_init(&export->exp_lock);
538
539         spin_lock(&obd->obd_dev_lock);
540         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
541         atomic_inc(&obd->obd_refcount);
542         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
543         list_add_tail(&export->exp_obd_chain_timed,
544                       &export->exp_obd->obd_exports_timed);
545         export->exp_obd->obd_num_exports++;
546         spin_unlock(&obd->obd_dev_lock);
547
548         obd_init_export(export);
549         return export;
550 }
551 EXPORT_SYMBOL(class_new_export);
552
553 void class_unlink_export(struct obd_export *exp)
554 {
555         class_handle_unhash(&exp->exp_handle);
556
557         spin_lock(&exp->exp_obd->obd_dev_lock);
558         list_del_init(&exp->exp_obd_chain);
559         list_del_init(&exp->exp_obd_chain_timed);
560         exp->exp_obd->obd_num_exports--;
561         spin_unlock(&exp->exp_obd->obd_dev_lock);
562
563         class_export_put(exp);
564 }
565 EXPORT_SYMBOL(class_unlink_export);
566
567 /* Import management functions */
568 static void import_handle_addref(void *import)
569 {
570         class_import_get(import);
571 }
572
573 struct obd_import *class_import_get(struct obd_import *import)
574 {
575         LASSERT(atomic_read(&import->imp_refcount) >= 0);
576         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
577         atomic_inc(&import->imp_refcount);
578         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
579                atomic_read(&import->imp_refcount));
580         return import;
581 }
582 EXPORT_SYMBOL(class_import_get);
583
584 void class_import_put(struct obd_import *import)
585 {
586         ENTRY;
587
588         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
589                atomic_read(&import->imp_refcount) - 1);
590
591         LASSERT(atomic_read(&import->imp_refcount) > 0);
592         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
593         if (!atomic_dec_and_test(&import->imp_refcount)) {
594                 EXIT;
595                 return;
596         }
597
598         CDEBUG(D_IOCTL, "destroying import %p\n", import);
599
600         ptlrpc_put_connection_superhack(import->imp_connection);
601
602         while (!list_empty(&import->imp_conn_list)) {
603                 struct obd_import_conn *imp_conn;
604
605                 imp_conn = list_entry(import->imp_conn_list.next,
606                                       struct obd_import_conn, oic_item);
607                 list_del(&imp_conn->oic_item);
608                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
609                 OBD_FREE(imp_conn, sizeof(*imp_conn));
610         }
611
612         LASSERT(list_empty(&import->imp_handle.h_link));
613         OBD_FREE(import, sizeof(*import));
614         EXIT;
615 }
616 EXPORT_SYMBOL(class_import_put);
617
618 struct obd_import *class_new_import(void)
619 {
620         struct obd_import *imp;
621
622         OBD_ALLOC(imp, sizeof(*imp));
623         if (imp == NULL)
624                 return NULL;
625
626         INIT_LIST_HEAD(&imp->imp_replay_list);
627         INIT_LIST_HEAD(&imp->imp_sending_list);
628         INIT_LIST_HEAD(&imp->imp_delayed_list);
629         spin_lock_init(&imp->imp_lock);
630         imp->imp_conn_cnt = 0;
631         imp->imp_max_transno = 0;
632         imp->imp_peer_committed_transno = 0;
633         imp->imp_state = LUSTRE_IMP_NEW;
634         init_waitqueue_head(&imp->imp_recovery_waitq);
635
636         atomic_set(&imp->imp_refcount, 2);
637         atomic_set(&imp->imp_inflight, 0);
638         atomic_set(&imp->imp_replay_inflight, 0);
639         INIT_LIST_HEAD(&imp->imp_conn_list);
640         INIT_LIST_HEAD(&imp->imp_handle.h_link);
641         class_handle_hash(&imp->imp_handle, import_handle_addref);
642
643         return imp;
644 }
645 EXPORT_SYMBOL(class_new_import);
646
647 void class_destroy_import(struct obd_import *import)
648 {
649         LASSERT(import != NULL);
650         LASSERT(import != LP_POISON);
651
652         class_handle_unhash(&import->imp_handle);
653
654         /* Abort any inflight DLM requests and NULL out their (about to be
655          * freed) import. */
656         /* Invalidate all requests on import, would be better to call
657            ptlrpc_set_import_active(imp, 0); */
658         import->imp_generation++;
659         ptlrpc_abort_inflight_superhack(import);
660
661         class_import_put(import);
662 }
663 EXPORT_SYMBOL(class_destroy_import);
664
665 /* A connection defines an export context in which preallocation can
666    be managed. This releases the export pointer reference, and returns
667    the export handle, so the export refcount is 1 when this function
668    returns. */
669 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
670                   struct obd_uuid *cluuid)
671 {
672         struct obd_export *export;
673         LASSERT(conn != NULL);
674         LASSERT(obd != NULL);
675         LASSERT(cluuid != NULL);
676         ENTRY;
677
678         export = class_new_export(obd);
679         if (export == NULL)
680                 RETURN(-ENOMEM);
681
682         conn->cookie = export->exp_handle.h_cookie;
683         memcpy(&export->exp_client_uuid, cluuid,
684                sizeof(export->exp_client_uuid));
685         class_export_put(export);
686
687         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
688                cluuid->uuid, conn->cookie);
689         RETURN(0);
690 }
691 EXPORT_SYMBOL(class_connect);
692
693 /* This function removes two references from the export: one for the
694  * hash entry and one for the export pointer passed in.  The export
695  * pointer passed to this function is destroyed should not be used
696  * again. */
697 int class_disconnect(struct obd_export *export)
698 {
699         int already_disconnected;
700         ENTRY;
701
702         if (export == NULL) {
703                 fixme();
704                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
705                 RETURN(-EINVAL);
706         }
707
708         spin_lock(&export->exp_lock);
709         already_disconnected = export->exp_disconnected;
710         export->exp_disconnected = 1;
711         spin_unlock(&export->exp_lock);
712
713         /* class_cleanup(), abort_recovery(), and class_fail_export()
714          * all end up in here, and if any of them race we shouldn't
715          * call extra class_export_puts(). */
716         if (already_disconnected)
717                 RETURN(0);
718
719         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
720                export->exp_handle.h_cookie);
721
722         class_unlink_export(export);
723         class_export_put(export);
724         RETURN(0);
725 }
726
727 static void class_disconnect_export_list(struct list_head *list, int flags)
728 {
729         int rc;
730         struct lustre_handle fake_conn;
731         struct obd_export *fake_exp, *exp;
732         ENTRY;
733
734         /* It's possible that an export may disconnect itself, but
735          * nothing else will be added to this list. */
736         while(!list_empty(list)) {
737                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
738                 class_export_get(exp);
739                 exp->exp_flags = flags;
740
741                 if (obd_uuid_equals(&exp->exp_client_uuid,
742                                     &exp->exp_obd->obd_uuid)) {
743                         CDEBUG(D_HA,
744                                "exp %p export uuid == obd uuid, don't discon\n",
745                                exp);
746                         /* Need to delete this now so we don't end up pointing
747                          * to work_list later when this export is cleaned up. */
748                         list_del_init(&exp->exp_obd_chain);
749                         class_export_put(exp);
750                         continue;
751                 }
752
753                 fake_conn.cookie = exp->exp_handle.h_cookie;
754                 fake_exp = class_conn2export(&fake_conn);
755                 if (!fake_exp) {
756                         class_export_put(exp);
757                         continue;
758                 }
759                 fake_exp->exp_flags = flags;
760                 rc = obd_disconnect(fake_exp);
761                 class_export_put(exp);
762                 if (rc) {
763                         CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
764                                exp, rc);
765                 } else {
766                         CDEBUG(D_HA, "export %p disconnected\n", exp);
767                 }
768         }
769         EXIT;
770 }
771
772 static inline int get_exp_flags_from_obd(struct obd_device *obd)
773 {
774         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
775                 (obd->obd_force ? OBD_OPT_FORCE : 0));
776 }
777
778 void class_disconnect_exports(struct obd_device *obd)
779 {
780         struct list_head work_list;
781         ENTRY;
782
783         /* Move all of the exports from obd_exports to a work list, en masse. */
784         spin_lock(&obd->obd_dev_lock);
785         list_add(&work_list, &obd->obd_exports);
786         list_del_init(&obd->obd_exports);
787         spin_unlock(&obd->obd_dev_lock);
788
789         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
790                "disconnecting them\n", obd->obd_minor, obd);
791         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
792         EXIT;
793 }
794 EXPORT_SYMBOL(class_disconnect_exports);
795
796 /* Remove exports that have not completed recovery.
797  */
798 void class_disconnect_stale_exports(struct obd_device *obd)
799 {
800         struct list_head work_list;
801         struct list_head *pos, *n;
802         struct obd_export *exp;
803         int cnt = 0;
804         ENTRY;
805
806         INIT_LIST_HEAD(&work_list);
807         spin_lock(&obd->obd_dev_lock);
808         list_for_each_safe(pos, n, &obd->obd_exports) {
809                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
810                 if (exp->exp_replay_needed) {
811                         list_del(&exp->exp_obd_chain);
812                         list_add(&exp->exp_obd_chain, &work_list);
813                         cnt++;
814                 }
815         }
816         spin_unlock(&obd->obd_dev_lock);
817
818         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
819                obd->obd_name, cnt);
820         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
821         EXIT;
822 }
823 EXPORT_SYMBOL(class_disconnect_stale_exports);
824
825 int oig_init(struct obd_io_group **oig_out)
826 {
827         struct obd_io_group *oig;
828         ENTRY;
829
830         OBD_ALLOC(oig, sizeof(*oig));
831         if (oig == NULL)
832                 RETURN(-ENOMEM);
833
834         spin_lock_init(&oig->oig_lock);
835         oig->oig_rc = 0;
836         oig->oig_pending = 0;
837         atomic_set(&oig->oig_refcount, 1);
838         init_waitqueue_head(&oig->oig_waitq);
839         INIT_LIST_HEAD(&oig->oig_occ_list);
840
841         *oig_out = oig;
842         RETURN(0);
843 };
844 EXPORT_SYMBOL(oig_init);
845
846 static inline void oig_grab(struct obd_io_group *oig)
847 {
848         atomic_inc(&oig->oig_refcount);
849 }
850
851 void oig_release(struct obd_io_group *oig)
852 {
853         if (atomic_dec_and_test(&oig->oig_refcount))
854                 OBD_FREE(oig, sizeof(*oig));
855 }
856 EXPORT_SYMBOL(oig_release);
857
858 void oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
859 {
860         unsigned long flags;
861         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
862         spin_lock_irqsave(&oig->oig_lock, flags);
863         oig->oig_pending++;
864         if (occ != NULL)
865                 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
866         spin_unlock_irqrestore(&oig->oig_lock, flags);
867         oig_grab(oig);
868 }
869 EXPORT_SYMBOL(oig_add_one);
870
871 void oig_complete_one(struct obd_io_group *oig,
872                       struct oig_callback_context *occ, int rc)
873 {
874         unsigned long flags;
875         wait_queue_head_t *wake = NULL;
876         int old_rc;
877
878         spin_lock_irqsave(&oig->oig_lock, flags);
879
880         if (occ != NULL)
881                 list_del_init(&occ->occ_oig_item);
882
883         old_rc = oig->oig_rc;
884         if (oig->oig_rc == 0 && rc != 0)
885                 oig->oig_rc = rc;
886
887         if (--oig->oig_pending <= 0)
888                 wake = &oig->oig_waitq;
889
890         spin_unlock_irqrestore(&oig->oig_lock, flags);
891
892         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
893                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
894                         oig->oig_pending);
895         if (wake)
896                 wake_up(wake);
897         oig_release(oig);
898 }
899 EXPORT_SYMBOL(oig_complete_one);
900
901 static int oig_done(struct obd_io_group *oig)
902 {
903         unsigned long flags;
904         int rc = 0;
905         spin_lock_irqsave(&oig->oig_lock, flags);
906         if (oig->oig_pending <= 0)
907                 rc = 1;
908         spin_unlock_irqrestore(&oig->oig_lock, flags);
909         return rc;
910 }
911
912 static void interrupted_oig(void *data)
913 {
914         struct obd_io_group *oig = data;
915         struct oig_callback_context *occ;
916         unsigned long flags;
917
918         spin_lock_irqsave(&oig->oig_lock, flags);
919         /* We need to restart the processing each time we drop the lock, as
920          * it is possible other threads called oig_complete_one() to remove
921          * an entry elsewhere in the list while we dropped lock.  We need to
922          * drop the lock because osc_ap_completion() calls oig_complete_one()
923          * which re-gets this lock ;-) as well as a lock ordering issue. */
924 restart:
925         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
926                 if (occ->interrupted)
927                         continue;
928                 occ->interrupted = 1;
929                 spin_unlock_irqrestore(&oig->oig_lock, flags);
930                 occ->occ_interrupted(occ);
931                 spin_lock_irqsave(&oig->oig_lock, flags);
932                 goto restart;
933         }
934         spin_unlock_irqrestore(&oig->oig_lock, flags);
935 }
936
937 int oig_wait(struct obd_io_group *oig)
938 {
939         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
940         int rc;
941
942         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
943
944         do {
945                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
946                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
947                 /* we can't continue until the oig has emptied and stopped
948                  * referencing state that the caller will free upon return */
949                 if (rc == -EINTR)
950                         lwi = (struct l_wait_info){ 0, };
951         } while (rc == -EINTR);
952
953         LASSERTF(oig->oig_pending == 0,
954                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
955                  oig->oig_pending);
956
957         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
958         return oig->oig_rc;
959 }
960 EXPORT_SYMBOL(oig_wait);
961
962 void class_fail_export(struct obd_export *exp)
963 {
964         int rc, already_failed;
965         unsigned long flags;
966
967         spin_lock_irqsave(&exp->exp_lock, flags);
968         already_failed = exp->exp_failed;
969         exp->exp_failed = 1;
970         spin_unlock_irqrestore(&exp->exp_lock, flags);
971
972         if (already_failed) {
973                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
974                        exp, exp->exp_client_uuid.uuid);
975                 return;
976         }
977
978         CDEBUG(D_HA, "disconnecting export %p/%s\n",
979                exp, exp->exp_client_uuid.uuid);
980
981         if (obd_dump_on_timeout)
982                 libcfs_debug_dumplog();
983
984         /* Most callers into obd_disconnect are removing their own reference
985          * (request, for example) in addition to the one from the hash table.
986          * We don't have such a reference here, so make one. */
987         class_export_get(exp);
988         rc = obd_disconnect(exp);
989         if (rc)
990                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
991         else
992                 CDEBUG(D_HA, "disconnected export %p/%s\n",
993                        exp, exp->exp_client_uuid.uuid);
994 }
995 EXPORT_SYMBOL(class_fail_export);
996
997 char *obd_export_nid2str(struct obd_export *exp)
998 {
999         if (exp->exp_connection != NULL)
1000                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1001         
1002         return "(no nid)";
1003 }
1004 EXPORT_SYMBOL(obd_export_nid2str);
1005
1006 /* Ping evictor thread */
1007 #ifdef __KERNEL__
1008 #define PET_READY     1
1009 #define PET_TERMINATE 2
1010
1011 static int               pet_refcount = 0;
1012 static int               pet_state;
1013 static wait_queue_head_t pet_waitq;
1014 static struct obd_export *pet_exp = NULL;
1015 static spinlock_t        pet_lock = SPIN_LOCK_UNLOCKED;
1016
1017 static int ping_evictor_wake(struct obd_export *exp)
1018 {
1019         spin_lock(&pet_lock);
1020         if (pet_exp) {
1021                 /* eventually the new obd will call here again. */
1022                 spin_unlock(&pet_lock);
1023                 return 1;
1024         }
1025
1026         /* We have to make sure the obd isn't destroyed between now and when
1027          * the ping evictor runs.  We'll take a reference here, and drop it
1028          * when we finish in the evictor.  We don't really care about this
1029          * export in particular; we just need one to keep the obd alive. */
1030         pet_exp = class_export_get(exp);
1031         spin_unlock(&pet_lock);
1032
1033         wake_up(&pet_waitq);
1034         return 0;
1035 }
1036
1037 static int ping_evictor_main(void *arg)
1038 {
1039         struct obd_device *obd;
1040         struct obd_export *exp;
1041         struct l_wait_info lwi = { 0 };
1042         time_t expire_time;
1043         unsigned long flags;
1044         ENTRY;
1045
1046         lock_kernel();
1047
1048         /* ptlrpc_daemonize() */
1049         exit_mm(current);
1050         lustre_daemonize_helper();
1051         set_fs_pwd(current->fs, init_task.fs->pwdmnt, init_task.fs->pwd);
1052         exit_files(current);
1053         reparent_to_init();
1054         THREAD_NAME(current->comm, sizeof(current->comm), "ping_evictor");
1055
1056         SIGNAL_MASK_LOCK(current, flags);
1057         sigfillset(&current->blocked);
1058         RECALC_SIGPENDING;
1059         SIGNAL_MASK_UNLOCK(current, flags);
1060         unlock_kernel();
1061
1062         CDEBUG(D_HA, "Starting Ping Evictor\n");
1063         pet_exp = NULL;
1064         pet_state = PET_READY;
1065         while (1) {
1066                 l_wait_event(pet_waitq, pet_exp ||
1067                              (pet_state == PET_TERMINATE), &lwi);
1068                 if (pet_state == PET_TERMINATE)
1069                         break;
1070
1071                 /* we only get here if pet_exp != NULL, and the end of this
1072                  * loop is the only place which sets it NULL again, so lock
1073                  * is not strictly necessary. */
1074                 spin_lock(&pet_lock);
1075                 obd = pet_exp->exp_obd;
1076                 spin_unlock(&pet_lock);
1077
1078                 expire_time = CURRENT_SECONDS - (3 * obd_timeout / 2);
1079
1080                 CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
1081                        obd->obd_name, expire_time);
1082
1083                 /* Exports can't be deleted out of the list while we hold
1084                  * the obd lock (class_unlink_export), which means we can't
1085                  * lose the last ref on the export.  If they've already been
1086                  * removed from the list, we won't find them here. */
1087                 spin_lock(&obd->obd_dev_lock);
1088                 while (!list_empty(&obd->obd_exports_timed)) {
1089                         exp = list_entry(obd->obd_exports_timed.next,
1090                                          struct obd_export,exp_obd_chain_timed);
1091
1092                         if (expire_time > exp->exp_last_request_time) {
1093                                 class_export_get(exp);
1094                                 spin_unlock(&obd->obd_dev_lock);
1095                                 LCONSOLE_WARN("%s: haven't heard from %s in %ld"
1096                                               " seconds. Last request was at %ld. "
1097                                               "I think it's dead, and I am evicting "
1098                                               "it.\n", obd->obd_name,
1099                                               obd_export_nid2str(exp),
1100                                               (long)(CURRENT_SECONDS -
1101                                                      exp->exp_last_request_time),
1102                                               exp->exp_last_request_time);
1103
1104
1105                                 class_fail_export(exp);
1106                                 class_export_put(exp);
1107
1108                                 spin_lock(&obd->obd_dev_lock);
1109                         } else {
1110                                 /* List is sorted, so everyone below is ok */
1111                                 break;
1112                         }
1113                 }
1114                 spin_unlock(&obd->obd_dev_lock);
1115
1116                 class_export_put(pet_exp);
1117
1118                 spin_lock(&pet_lock);
1119                 pet_exp = NULL;
1120                 spin_unlock(&pet_lock);
1121         }
1122         CDEBUG(D_HA, "Exiting Ping Evictor\n");
1123
1124         RETURN(0);
1125 }
1126
1127 void ping_evictor_start(void)
1128 {
1129         int rc;
1130
1131         if (++pet_refcount > 1)
1132                 return;
1133
1134         init_waitqueue_head(&pet_waitq);
1135
1136         rc = kernel_thread(ping_evictor_main, NULL, CLONE_VM | CLONE_FS);
1137         if (rc < 0) {
1138                 pet_refcount--;
1139                 CERROR("Cannot start ping evictor thread: %d\n", rc);
1140         }
1141 }
1142 EXPORT_SYMBOL(ping_evictor_start);
1143
1144 void ping_evictor_stop(void)
1145 {
1146         if (--pet_refcount > 0)
1147                 return;
1148
1149         pet_state = PET_TERMINATE;
1150         wake_up(&pet_waitq);
1151 }
1152 EXPORT_SYMBOL(ping_evictor_stop);
1153 #else /* !__KERNEL__ */
1154 #define ping_evictor_wake(exp)     1
1155 #endif
1156
1157 /* This function makes sure dead exports are evicted in a timely manner.
1158    This function is only called when some export receives a message (i.e.,
1159    the network is up.) */
1160 void class_update_export_timer(struct obd_export *exp, time_t extra_delay)
1161 {
1162         struct obd_export *oldest_exp;
1163         time_t oldest_time;
1164
1165         ENTRY;
1166
1167         LASSERT(exp);
1168
1169         /* Compensate for slow machines, etc, by faking our request time
1170            into the future.  Although this can break the strict time-ordering
1171            of the list, we can be really lazy here - we don't have to evict
1172            at the exact right moment.  Eventually, all silent exports
1173            will make it to the top of the list. */
1174         exp->exp_last_request_time = max(exp->exp_last_request_time,
1175                                          (time_t)CURRENT_SECONDS + extra_delay);
1176
1177         CDEBUG(D_INFO, "updating export %s at %ld\n",
1178                exp->exp_client_uuid.uuid,
1179                exp->exp_last_request_time);
1180
1181         /* exports may get disconnected from the chain even though the
1182            export has references, so we must keep the spin lock while
1183            manipulating the lists */
1184         spin_lock(&exp->exp_obd->obd_dev_lock);
1185
1186         if (list_empty(&exp->exp_obd_chain_timed)) {
1187                 /* this one is not timed */
1188                 spin_unlock(&exp->exp_obd->obd_dev_lock);
1189                 EXIT;
1190                 return;
1191         }
1192
1193         list_move_tail(&exp->exp_obd_chain_timed,
1194                        &exp->exp_obd->obd_exports_timed);
1195
1196         oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
1197                                 struct obd_export, exp_obd_chain_timed);
1198         oldest_time = oldest_exp->exp_last_request_time;
1199         spin_unlock(&exp->exp_obd->obd_dev_lock);
1200
1201         if (exp->exp_obd->obd_recovering) {
1202                 /* be nice to everyone during recovery */
1203                 EXIT;
1204                 return;
1205         }
1206
1207         /* Note - racing to start/reset the obd_eviction timer is safe */
1208         if (exp->exp_obd->obd_eviction_timer == 0) {
1209                 /* Check if the oldest entry is expired. */
1210                 if (CURRENT_SECONDS > (oldest_time +
1211                                        (3 * obd_timeout / 2) + extra_delay)) {
1212                         /* We need a second timer, in case the net was down and
1213                          * it just came back. Since the pinger may skip every
1214                          * other PING_INTERVAL (see note in ptlrpc_pinger_main),
1215                          * we better wait for 3. */
1216                         exp->exp_obd->obd_eviction_timer = CURRENT_SECONDS +
1217                                 3 * PING_INTERVAL;
1218                         CDEBUG(D_HA, "%s: Think about evicting %s from %ld\n",
1219                                exp->exp_obd->obd_name, obd_export_nid2str(exp),
1220                                oldest_time);
1221                 }
1222         } else {
1223                 if (CURRENT_SECONDS > (exp->exp_obd->obd_eviction_timer +
1224                                        extra_delay)) {
1225                         /* The evictor won't evict anyone who we've heard from
1226                          * recently, so we don't have to check before we start
1227                          * it. */
1228                         if (!ping_evictor_wake(exp))
1229                                 exp->exp_obd->obd_eviction_timer = 0;
1230                 }
1231         }
1232
1233         EXIT;
1234 }
1235 EXPORT_SYMBOL(class_update_export_timer);
1236
1237 #define EVICT_BATCH 32
1238 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1239 {
1240         struct obd_export *doomed_exp[EVICT_BATCH] = { NULL };
1241         struct list_head *p;
1242         int exports_evicted = 0, num_to_evict = 0, i;
1243
1244 search_again:
1245         spin_lock(&obd->obd_dev_lock);
1246         list_for_each(p, &obd->obd_exports) {
1247                 doomed_exp[num_to_evict] = list_entry(p, struct obd_export,
1248                                                       exp_obd_chain);
1249                 if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]), nid)
1250                     == 0) {
1251                         class_export_get(doomed_exp[num_to_evict]);
1252                         if (++num_to_evict == EVICT_BATCH)
1253                                 break;
1254                 }
1255         }
1256         spin_unlock(&obd->obd_dev_lock);
1257
1258         for (i = 0; i < num_to_evict; i++) {
1259                 exports_evicted++;
1260                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1261                        obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid,
1262                        exports_evicted);
1263                 class_fail_export(doomed_exp[i]);
1264                 class_export_put(doomed_exp[i]);
1265         }
1266         if (num_to_evict == EVICT_BATCH) {
1267                 num_to_evict = 0;
1268                 goto search_again;
1269         }
1270
1271         if (!exports_evicted)
1272                 CERROR("%s: can't disconnect NID '%s': no exports found\n",
1273                        obd->obd_name, nid);
1274         return exports_evicted;
1275 }
1276 EXPORT_SYMBOL(obd_export_evict_by_nid);
1277
1278 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1279 {
1280         struct obd_export *doomed_exp = NULL;
1281         struct list_head *p;
1282         struct obd_uuid doomed;
1283         int exports_evicted = 0;
1284
1285         obd_str2uuid(&doomed, uuid);
1286
1287         spin_lock(&obd->obd_dev_lock);
1288         list_for_each(p, &obd->obd_exports) {
1289                 doomed_exp = list_entry(p, struct obd_export, exp_obd_chain);
1290
1291                 if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) {
1292                         class_export_get(doomed_exp);
1293                         break;
1294                 }
1295                 doomed_exp = NULL;
1296         }
1297         spin_unlock(&obd->obd_dev_lock);
1298
1299         if (doomed_exp == NULL) {
1300                 CERROR("%s: can't disconnect %s: no exports found\n",
1301                        obd->obd_name, uuid);
1302         } else {
1303                 CWARN("%s: evicting %s at adminstrative request\n",
1304                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1305                 class_fail_export(doomed_exp);
1306                 class_export_put(doomed_exp);
1307                 exports_evicted++;
1308         }
1309
1310         return exports_evicted;
1311 }
1312 EXPORT_SYMBOL(obd_export_evict_by_uuid);