Whamcloud - gitweb
LU-1943 filter: remove obdfilter from tree
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!cfs_request_module("%s", modname)) {
129                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130                         type = class_search_type(name);
131                 } else {
132                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133                                            modname);
134                 }
135         }
136 #endif
137         if (type) {
138                 cfs_spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 cfs_try_module_get(type->typ_dt_ops->o_owner);
141                 cfs_spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         cfs_spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         cfs_module_put(type->typ_dt_ops->o_owner);
153         cfs_spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165         ENTRY;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 RETURN(-EEXIST);
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 RETURN(rc);
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         cfs_spin_lock_init(&type->obd_type_lock);
195
196 #ifdef LPROCFS
197         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
198                                               vars, type);
199         if (IS_ERR(type->typ_procroot)) {
200                 rc = PTR_ERR(type->typ_procroot);
201                 type->typ_procroot = NULL;
202                 GOTO (failed, rc);
203         }
204 #endif
205         if (ldt != NULL) {
206                 type->typ_lu = ldt;
207                 rc = lu_device_type_init(ldt);
208                 if (rc != 0)
209                         GOTO (failed, rc);
210         }
211
212         cfs_spin_lock(&obd_types_lock);
213         cfs_list_add(&type->typ_chain, &obd_types);
214         cfs_spin_unlock(&obd_types_lock);
215
216         RETURN (0);
217
218  failed:
219         if (type->typ_name != NULL)
220                 OBD_FREE(type->typ_name, strlen(name) + 1);
221         if (type->typ_md_ops != NULL)
222                 OBD_FREE_PTR(type->typ_md_ops);
223         if (type->typ_dt_ops != NULL)
224                 OBD_FREE_PTR(type->typ_dt_ops);
225         OBD_FREE(type, sizeof(*type));
226         RETURN(rc);
227 }
228 EXPORT_SYMBOL(class_register_type);
229
230 int class_unregister_type(const char *name)
231 {
232         struct obd_type *type = class_search_type(name);
233         ENTRY;
234
235         if (!type) {
236                 CERROR("unknown obd type\n");
237                 RETURN(-EINVAL);
238         }
239
240         if (type->typ_refcnt) {
241                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242                 /* This is a bad situation, let's make the best of it */
243                 /* Remove ops, but leave the name for debugging */
244                 OBD_FREE_PTR(type->typ_dt_ops);
245                 OBD_FREE_PTR(type->typ_md_ops);
246                 RETURN(-EBUSY);
247         }
248
249         /* we do not use type->typ_procroot as for compatibility purposes
250          * other modules can share names (i.e. lod can use lov entry). so
251          * we can't reference pointer as it can get invalided when another
252          * module removes the entry */
253         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
254
255         if (type->typ_lu)
256                 lu_device_type_fini(type->typ_lu);
257
258         cfs_spin_lock(&obd_types_lock);
259         cfs_list_del(&type->typ_chain);
260         cfs_spin_unlock(&obd_types_lock);
261         OBD_FREE(type->typ_name, strlen(name) + 1);
262         if (type->typ_dt_ops != NULL)
263                 OBD_FREE_PTR(type->typ_dt_ops);
264         if (type->typ_md_ops != NULL)
265                 OBD_FREE_PTR(type->typ_md_ops);
266         OBD_FREE(type, sizeof(*type));
267         RETURN(0);
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
270
271 /**
272  * Create a new obd device.
273  *
274  * Find an empty slot in ::obd_devs[], create a new obd device in it.
275  *
276  * \param[in] type_name obd device type string.
277  * \param[in] name      obd device name.
278  *
279  * \retval NULL if create fails, otherwise return the obd device
280  *         pointer created.
281  */
282 struct obd_device *class_newdev(const char *type_name, const char *name)
283 {
284         struct obd_device *result = NULL;
285         struct obd_device *newdev;
286         struct obd_type *type = NULL;
287         int i;
288         int new_obd_minor = 0;
289         ENTRY;
290
291         if (strlen(name) >= MAX_OBD_NAME) {
292                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293                 RETURN(ERR_PTR(-EINVAL));
294         }
295
296         type = class_get_type(type_name);
297         if (type == NULL){
298                 CERROR("OBD: unknown type: %s\n", type_name);
299                 RETURN(ERR_PTR(-ENODEV));
300         }
301
302         newdev = obd_device_alloc();
303         if (newdev == NULL) {
304                 class_put_type(type);
305                 RETURN(ERR_PTR(-ENOMEM));
306         }
307         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
308
309         cfs_write_lock(&obd_dev_lock);
310         for (i = 0; i < class_devno_max(); i++) {
311                 struct obd_device *obd = class_num2obd(i);
312
313                 if (obd && obd->obd_name &&
314                     (strcmp(name, obd->obd_name) == 0)) {
315                         CERROR("Device %s already exists at %d, won't add\n",
316                                name, i);
317                         if (result) {
318                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
319                                          "%p obd_magic %08x != %08x\n", result,
320                                          result->obd_magic, OBD_DEVICE_MAGIC);
321                                 LASSERTF(result->obd_minor == new_obd_minor,
322                                          "%p obd_minor %d != %d\n", result,
323                                          result->obd_minor, new_obd_minor);
324
325                                 obd_devs[result->obd_minor] = NULL;
326                                 result->obd_name[0]='\0';
327                          }
328                         result = ERR_PTR(-EEXIST);
329                         break;
330                 }
331                 if (!result && !obd) {
332                         result = newdev;
333                         result->obd_minor = i;
334                         new_obd_minor = i;
335                         result->obd_type = type;
336                         strncpy(result->obd_name, name,
337                                 sizeof(result->obd_name) - 1);
338                         obd_devs[i] = result;
339                 }
340         }
341         cfs_write_unlock(&obd_dev_lock);
342
343         if (result == NULL && i >= class_devno_max()) {
344                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
345                        class_devno_max());
346                 RETURN(ERR_PTR(-EOVERFLOW));
347         }
348
349         if (IS_ERR(result)) {
350                 obd_device_free(newdev);
351                 class_put_type(type);
352         } else {
353                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
354                        result->obd_name, result);
355         }
356         RETURN(result);
357 }
358
359 void class_release_dev(struct obd_device *obd)
360 {
361         struct obd_type *obd_type = obd->obd_type;
362
363         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
364                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
365         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
366                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
367         LASSERT(obd_type != NULL);
368
369         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
370                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371
372         cfs_write_lock(&obd_dev_lock);
373         obd_devs[obd->obd_minor] = NULL;
374         cfs_write_unlock(&obd_dev_lock);
375         obd_device_free(obd);
376
377         class_put_type(obd_type);
378 }
379
380 int class_name2dev(const char *name)
381 {
382         int i;
383
384         if (!name)
385                 return -1;
386
387         cfs_read_lock(&obd_dev_lock);
388         for (i = 0; i < class_devno_max(); i++) {
389                 struct obd_device *obd = class_num2obd(i);
390
391                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
392                         /* Make sure we finished attaching before we give
393                            out any references */
394                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395                         if (obd->obd_attached) {
396                                 cfs_read_unlock(&obd_dev_lock);
397                                 return i;
398                         }
399                         break;
400                 }
401         }
402         cfs_read_unlock(&obd_dev_lock);
403
404         return -1;
405 }
406 EXPORT_SYMBOL(class_name2dev);
407
408 struct obd_device *class_name2obd(const char *name)
409 {
410         int dev = class_name2dev(name);
411
412         if (dev < 0 || dev > class_devno_max())
413                 return NULL;
414         return class_num2obd(dev);
415 }
416 EXPORT_SYMBOL(class_name2obd);
417
418 int class_uuid2dev(struct obd_uuid *uuid)
419 {
420         int i;
421
422         cfs_read_lock(&obd_dev_lock);
423         for (i = 0; i < class_devno_max(); i++) {
424                 struct obd_device *obd = class_num2obd(i);
425
426                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
427                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428                         cfs_read_unlock(&obd_dev_lock);
429                         return i;
430                 }
431         }
432         cfs_read_unlock(&obd_dev_lock);
433
434         return -1;
435 }
436 EXPORT_SYMBOL(class_uuid2dev);
437
438 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
439 {
440         int dev = class_uuid2dev(uuid);
441         if (dev < 0)
442                 return NULL;
443         return class_num2obd(dev);
444 }
445 EXPORT_SYMBOL(class_uuid2obd);
446
447 /**
448  * Get obd device from ::obd_devs[]
449  *
450  * \param num [in] array index
451  *
452  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
453  *         otherwise return the obd device there.
454  */
455 struct obd_device *class_num2obd(int num)
456 {
457         struct obd_device *obd = NULL;
458
459         if (num < class_devno_max()) {
460                 obd = obd_devs[num];
461                 if (obd == NULL)
462                         return NULL;
463
464                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
465                          "%p obd_magic %08x != %08x\n",
466                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
467                 LASSERTF(obd->obd_minor == num,
468                          "%p obd_minor %0d != %0d\n",
469                          obd, obd->obd_minor, num);
470         }
471
472         return obd;
473 }
474 EXPORT_SYMBOL(class_num2obd);
475
476 void class_obd_list(void)
477 {
478         char *status;
479         int i;
480
481         cfs_read_lock(&obd_dev_lock);
482         for (i = 0; i < class_devno_max(); i++) {
483                 struct obd_device *obd = class_num2obd(i);
484
485                 if (obd == NULL)
486                         continue;
487                 if (obd->obd_stopping)
488                         status = "ST";
489                 else if (obd->obd_set_up)
490                         status = "UP";
491                 else if (obd->obd_attached)
492                         status = "AT";
493                 else
494                         status = "--";
495                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
496                          i, status, obd->obd_type->typ_name,
497                          obd->obd_name, obd->obd_uuid.uuid,
498                          cfs_atomic_read(&obd->obd_refcount));
499         }
500         cfs_read_unlock(&obd_dev_lock);
501         return;
502 }
503
504 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
505    specified, then only the client with that uuid is returned,
506    otherwise any client connected to the tgt is returned. */
507 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
508                                           const char * typ_name,
509                                           struct obd_uuid *grp_uuid)
510 {
511         int i;
512
513         cfs_read_lock(&obd_dev_lock);
514         for (i = 0; i < class_devno_max(); i++) {
515                 struct obd_device *obd = class_num2obd(i);
516
517                 if (obd == NULL)
518                         continue;
519                 if ((strncmp(obd->obd_type->typ_name, typ_name,
520                              strlen(typ_name)) == 0)) {
521                         if (obd_uuid_equals(tgt_uuid,
522                                             &obd->u.cli.cl_target_uuid) &&
523                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
524                                                          &obd->obd_uuid) : 1)) {
525                                 cfs_read_unlock(&obd_dev_lock);
526                                 return obd;
527                         }
528                 }
529         }
530         cfs_read_unlock(&obd_dev_lock);
531
532         return NULL;
533 }
534 EXPORT_SYMBOL(class_find_client_obd);
535
536 /* Iterate the obd_device list looking devices have grp_uuid. Start
537    searching at *next, and if a device is found, the next index to look
538    at is saved in *next. If next is NULL, then the first matching device
539    will always be returned. */
540 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
541 {
542         int i;
543
544         if (next == NULL)
545                 i = 0;
546         else if (*next >= 0 && *next < class_devno_max())
547                 i = *next;
548         else
549                 return NULL;
550
551         cfs_read_lock(&obd_dev_lock);
552         for (; i < class_devno_max(); i++) {
553                 struct obd_device *obd = class_num2obd(i);
554
555                 if (obd == NULL)
556                         continue;
557                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
558                         if (next != NULL)
559                                 *next = i+1;
560                         cfs_read_unlock(&obd_dev_lock);
561                         return obd;
562                 }
563         }
564         cfs_read_unlock(&obd_dev_lock);
565
566         return NULL;
567 }
568 EXPORT_SYMBOL(class_devices_in_group);
569
570 /**
571  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
572  * adjust sptlrpc settings accordingly.
573  */
574 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
575 {
576         struct obd_device  *obd;
577         const char         *type;
578         int                 i, rc = 0, rc2;
579
580         LASSERT(namelen > 0);
581
582         cfs_read_lock(&obd_dev_lock);
583         for (i = 0; i < class_devno_max(); i++) {
584                 obd = class_num2obd(i);
585
586                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
587                         continue;
588
589                 /* only notify mdc, osc, mdt, ost */
590                 type = obd->obd_type->typ_name;
591                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
592                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
593                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
594                     strcmp(type, LUSTRE_OST_NAME) != 0)
595                         continue;
596
597                 if (strncmp(obd->obd_name, fsname, namelen))
598                         continue;
599
600                 class_incref(obd, __FUNCTION__, obd);
601                 cfs_read_unlock(&obd_dev_lock);
602                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
603                                          sizeof(KEY_SPTLRPC_CONF),
604                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
605                 rc = rc ? rc : rc2;
606                 class_decref(obd, __FUNCTION__, obd);
607                 cfs_read_lock(&obd_dev_lock);
608         }
609         cfs_read_unlock(&obd_dev_lock);
610         return rc;
611 }
612 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
613
614 void obd_cleanup_caches(void)
615 {
616         int rc;
617
618         ENTRY;
619         if (obd_device_cachep) {
620                 rc = cfs_mem_cache_destroy(obd_device_cachep);
621                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
622                 obd_device_cachep = NULL;
623         }
624         if (obdo_cachep) {
625                 rc = cfs_mem_cache_destroy(obdo_cachep);
626                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
627                 obdo_cachep = NULL;
628         }
629         if (import_cachep) {
630                 rc = cfs_mem_cache_destroy(import_cachep);
631                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
632                 import_cachep = NULL;
633         }
634         if (capa_cachep) {
635                 rc = cfs_mem_cache_destroy(capa_cachep);
636                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
637                 capa_cachep = NULL;
638         }
639         EXIT;
640 }
641
642 int obd_init_caches(void)
643 {
644         ENTRY;
645
646         LASSERT(obd_device_cachep == NULL);
647         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
648                                                  sizeof(struct obd_device),
649                                                  0, 0);
650         if (!obd_device_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(obdo_cachep == NULL);
654         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
655                                            0, 0);
656         if (!obdo_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(import_cachep == NULL);
660         import_cachep = cfs_mem_cache_create("ll_import_cache",
661                                              sizeof(struct obd_import),
662                                              0, 0);
663         if (!import_cachep)
664                 GOTO(out, -ENOMEM);
665
666         LASSERT(capa_cachep == NULL);
667         capa_cachep = cfs_mem_cache_create("capa_cache",
668                                            sizeof(struct obd_capa), 0, 0);
669         if (!capa_cachep)
670                 GOTO(out, -ENOMEM);
671
672         RETURN(0);
673  out:
674         obd_cleanup_caches();
675         RETURN(-ENOMEM);
676
677 }
678
679 /* map connection to client */
680 struct obd_export *class_conn2export(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         ENTRY;
684
685         if (!conn) {
686                 CDEBUG(D_CACHE, "looking for null handle\n");
687                 RETURN(NULL);
688         }
689
690         if (conn->cookie == -1) {  /* this means assign a new connection */
691                 CDEBUG(D_CACHE, "want a new connection\n");
692                 RETURN(NULL);
693         }
694
695         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
696         export = class_handle2object(conn->cookie);
697         RETURN(export);
698 }
699 EXPORT_SYMBOL(class_conn2export);
700
701 struct obd_device *class_exp2obd(struct obd_export *exp)
702 {
703         if (exp)
704                 return exp->exp_obd;
705         return NULL;
706 }
707 EXPORT_SYMBOL(class_exp2obd);
708
709 struct obd_device *class_conn2obd(struct lustre_handle *conn)
710 {
711         struct obd_export *export;
712         export = class_conn2export(conn);
713         if (export) {
714                 struct obd_device *obd = export->exp_obd;
715                 class_export_put(export);
716                 return obd;
717         }
718         return NULL;
719 }
720 EXPORT_SYMBOL(class_conn2obd);
721
722 struct obd_import *class_exp2cliimp(struct obd_export *exp)
723 {
724         struct obd_device *obd = exp->exp_obd;
725         if (obd == NULL)
726                 return NULL;
727         return obd->u.cli.cl_import;
728 }
729 EXPORT_SYMBOL(class_exp2cliimp);
730
731 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
732 {
733         struct obd_device *obd = class_conn2obd(conn);
734         if (obd == NULL)
735                 return NULL;
736         return obd->u.cli.cl_import;
737 }
738 EXPORT_SYMBOL(class_conn2cliimp);
739
740 /* Export management functions */
741 static void class_export_destroy(struct obd_export *exp)
742 {
743         struct obd_device *obd = exp->exp_obd;
744         ENTRY;
745
746         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
747         LASSERT(obd != NULL);
748
749         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
750                exp->exp_client_uuid.uuid, obd->obd_name);
751
752         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
753         if (exp->exp_connection)
754                 ptlrpc_put_connection_superhack(exp->exp_connection);
755
756         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
757         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
758         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
759         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
760         obd_destroy_export(exp);
761         class_decref(obd, "export", exp);
762
763         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
764         EXIT;
765 }
766
767 static void export_handle_addref(void *export)
768 {
769         class_export_get(export);
770 }
771
772 static struct portals_handle_ops export_handle_ops = {
773         .hop_addref = export_handle_addref,
774         .hop_free   = NULL,
775 };
776
777 struct obd_export *class_export_get(struct obd_export *exp)
778 {
779         cfs_atomic_inc(&exp->exp_refcount);
780         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
781                cfs_atomic_read(&exp->exp_refcount));
782         return exp;
783 }
784 EXPORT_SYMBOL(class_export_get);
785
786 void class_export_put(struct obd_export *exp)
787 {
788         LASSERT(exp != NULL);
789         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
790         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
791                cfs_atomic_read(&exp->exp_refcount) - 1);
792
793         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
794                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
795                 CDEBUG(D_IOCTL, "final put %p/%s\n",
796                        exp, exp->exp_client_uuid.uuid);
797
798                 /* release nid stat refererence */
799                 lprocfs_exp_cleanup(exp);
800
801                 obd_zombie_export_add(exp);
802         }
803 }
804 EXPORT_SYMBOL(class_export_put);
805
806 /* Creates a new export, adds it to the hash table, and returns a
807  * pointer to it. The refcount is 2: one for the hash reference, and
808  * one for the pointer returned by this function. */
809 struct obd_export *class_new_export(struct obd_device *obd,
810                                     struct obd_uuid *cluuid)
811 {
812         struct obd_export *export;
813         cfs_hash_t *hash = NULL;
814         int rc = 0;
815         ENTRY;
816
817         OBD_ALLOC_PTR(export);
818         if (!export)
819                 return ERR_PTR(-ENOMEM);
820
821         export->exp_conn_cnt = 0;
822         export->exp_lock_hash = NULL;
823         export->exp_flock_hash = NULL;
824         cfs_atomic_set(&export->exp_refcount, 2);
825         cfs_atomic_set(&export->exp_rpc_count, 0);
826         cfs_atomic_set(&export->exp_cb_count, 0);
827         cfs_atomic_set(&export->exp_locks_count, 0);
828 #if LUSTRE_TRACKS_LOCK_EXP_REFS
829         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
830         cfs_spin_lock_init(&export->exp_locks_list_guard);
831 #endif
832         cfs_atomic_set(&export->exp_replay_count, 0);
833         export->exp_obd = obd;
834         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
835         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
836         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
837         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
838         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
839         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
840         class_handle_hash(&export->exp_handle, &export_handle_ops);
841         export->exp_last_request_time = cfs_time_current_sec();
842         cfs_spin_lock_init(&export->exp_lock);
843         cfs_spin_lock_init(&export->exp_rpc_lock);
844         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
845         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
846         cfs_spin_lock_init(&export->exp_bl_list_lock);
847         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
848
849         export->exp_sp_peer = LUSTRE_SP_ANY;
850         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
851         export->exp_client_uuid = *cluuid;
852         obd_init_export(export);
853
854         cfs_spin_lock(&obd->obd_dev_lock);
855          /* shouldn't happen, but might race */
856         if (obd->obd_stopping)
857                 GOTO(exit_unlock, rc = -ENODEV);
858
859         hash = cfs_hash_getref(obd->obd_uuid_hash);
860         if (hash == NULL)
861                 GOTO(exit_unlock, rc = -ENODEV);
862         cfs_spin_unlock(&obd->obd_dev_lock);
863
864         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
865                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
866                 if (rc != 0) {
867                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
868                                       obd->obd_name, cluuid->uuid, rc);
869                         GOTO(exit_err, rc = -EALREADY);
870                 }
871         }
872
873         cfs_spin_lock(&obd->obd_dev_lock);
874         if (obd->obd_stopping) {
875                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
876                 GOTO(exit_unlock, rc = -ENODEV);
877         }
878
879         class_incref(obd, "export", export);
880         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
881         cfs_list_add_tail(&export->exp_obd_chain_timed,
882                           &export->exp_obd->obd_exports_timed);
883         export->exp_obd->obd_num_exports++;
884         cfs_spin_unlock(&obd->obd_dev_lock);
885         cfs_hash_putref(hash);
886         RETURN(export);
887
888 exit_unlock:
889         cfs_spin_unlock(&obd->obd_dev_lock);
890 exit_err:
891         if (hash)
892                 cfs_hash_putref(hash);
893         class_handle_unhash(&export->exp_handle);
894         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
895         obd_destroy_export(export);
896         OBD_FREE_PTR(export);
897         return ERR_PTR(rc);
898 }
899 EXPORT_SYMBOL(class_new_export);
900
901 void class_unlink_export(struct obd_export *exp)
902 {
903         class_handle_unhash(&exp->exp_handle);
904
905         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
906         /* delete an uuid-export hashitem from hashtables */
907         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
908                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
909                              &exp->exp_client_uuid,
910                              &exp->exp_uuid_hash);
911
912         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
913         cfs_list_del_init(&exp->exp_obd_chain_timed);
914         exp->exp_obd->obd_num_exports--;
915         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
916         class_export_put(exp);
917 }
918 EXPORT_SYMBOL(class_unlink_export);
919
920 /* Import management functions */
921 void class_import_destroy(struct obd_import *imp)
922 {
923         ENTRY;
924
925         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
926                 imp->imp_obd->obd_name);
927
928         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
929
930         ptlrpc_put_connection_superhack(imp->imp_connection);
931
932         while (!cfs_list_empty(&imp->imp_conn_list)) {
933                 struct obd_import_conn *imp_conn;
934
935                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
936                                           struct obd_import_conn, oic_item);
937                 cfs_list_del_init(&imp_conn->oic_item);
938                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
939                 OBD_FREE(imp_conn, sizeof(*imp_conn));
940         }
941
942         LASSERT(imp->imp_sec == NULL);
943         class_decref(imp->imp_obd, "import", imp);
944         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
945         EXIT;
946 }
947
948 static void import_handle_addref(void *import)
949 {
950         class_import_get(import);
951 }
952
953 static struct portals_handle_ops import_handle_ops = {
954         .hop_addref = import_handle_addref,
955         .hop_free   = NULL,
956 };
957
958 struct obd_import *class_import_get(struct obd_import *import)
959 {
960         cfs_atomic_inc(&import->imp_refcount);
961         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
962                cfs_atomic_read(&import->imp_refcount),
963                import->imp_obd->obd_name);
964         return import;
965 }
966 EXPORT_SYMBOL(class_import_get);
967
968 void class_import_put(struct obd_import *imp)
969 {
970         ENTRY;
971
972         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
973         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
974
975         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
976                cfs_atomic_read(&imp->imp_refcount) - 1,
977                imp->imp_obd->obd_name);
978
979         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
980                 CDEBUG(D_INFO, "final put import %p\n", imp);
981                 obd_zombie_import_add(imp);
982         }
983
984         /* catch possible import put race */
985         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
986         EXIT;
987 }
988 EXPORT_SYMBOL(class_import_put);
989
990 static void init_imp_at(struct imp_at *at) {
991         int i;
992         at_init(&at->iat_net_latency, 0, 0);
993         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
994                 /* max service estimates are tracked on the server side, so
995                    don't use the AT history here, just use the last reported
996                    val. (But keep hist for proc histogram, worst_ever) */
997                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
998                         AT_FLG_NOHIST);
999         }
1000 }
1001
1002 struct obd_import *class_new_import(struct obd_device *obd)
1003 {
1004         struct obd_import *imp;
1005
1006         OBD_ALLOC(imp, sizeof(*imp));
1007         if (imp == NULL)
1008                 return NULL;
1009
1010         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1011         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1012         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1013         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1014         cfs_spin_lock_init(&imp->imp_lock);
1015         imp->imp_last_success_conn = 0;
1016         imp->imp_state = LUSTRE_IMP_NEW;
1017         imp->imp_obd = class_incref(obd, "import", imp);
1018         cfs_mutex_init(&imp->imp_sec_mutex);
1019         cfs_waitq_init(&imp->imp_recovery_waitq);
1020
1021         cfs_atomic_set(&imp->imp_refcount, 2);
1022         cfs_atomic_set(&imp->imp_unregistering, 0);
1023         cfs_atomic_set(&imp->imp_inflight, 0);
1024         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1025         cfs_atomic_set(&imp->imp_inval_count, 0);
1026         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1027         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1028         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1029         init_imp_at(&imp->imp_at);
1030
1031         /* the default magic is V2, will be used in connect RPC, and
1032          * then adjusted according to the flags in request/reply. */
1033         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1034
1035         return imp;
1036 }
1037 EXPORT_SYMBOL(class_new_import);
1038
1039 void class_destroy_import(struct obd_import *import)
1040 {
1041         LASSERT(import != NULL);
1042         LASSERT(import != LP_POISON);
1043
1044         class_handle_unhash(&import->imp_handle);
1045
1046         cfs_spin_lock(&import->imp_lock);
1047         import->imp_generation++;
1048         cfs_spin_unlock(&import->imp_lock);
1049         class_import_put(import);
1050 }
1051 EXPORT_SYMBOL(class_destroy_import);
1052
1053 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1054
1055 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056 {
1057         cfs_spin_lock(&exp->exp_locks_list_guard);
1058
1059         LASSERT(lock->l_exp_refs_nr >= 0);
1060
1061         if (lock->l_exp_refs_target != NULL &&
1062             lock->l_exp_refs_target != exp) {
1063                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1064                               exp, lock, lock->l_exp_refs_target);
1065         }
1066         if ((lock->l_exp_refs_nr ++) == 0) {
1067                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1068                 lock->l_exp_refs_target = exp;
1069         }
1070         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071                lock, exp, lock->l_exp_refs_nr);
1072         cfs_spin_unlock(&exp->exp_locks_list_guard);
1073 }
1074 EXPORT_SYMBOL(__class_export_add_lock_ref);
1075
1076 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1077 {
1078         cfs_spin_lock(&exp->exp_locks_list_guard);
1079         LASSERT(lock->l_exp_refs_nr > 0);
1080         if (lock->l_exp_refs_target != exp) {
1081                 LCONSOLE_WARN("lock %p, "
1082                               "mismatching export pointers: %p, %p\n",
1083                               lock, lock->l_exp_refs_target, exp);
1084         }
1085         if (-- lock->l_exp_refs_nr == 0) {
1086                 cfs_list_del_init(&lock->l_exp_refs_link);
1087                 lock->l_exp_refs_target = NULL;
1088         }
1089         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1090                lock, exp, lock->l_exp_refs_nr);
1091         cfs_spin_unlock(&exp->exp_locks_list_guard);
1092 }
1093 EXPORT_SYMBOL(__class_export_del_lock_ref);
1094 #endif
1095
1096 /* A connection defines an export context in which preallocation can
1097    be managed. This releases the export pointer reference, and returns
1098    the export handle, so the export refcount is 1 when this function
1099    returns. */
1100 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1101                   struct obd_uuid *cluuid)
1102 {
1103         struct obd_export *export;
1104         LASSERT(conn != NULL);
1105         LASSERT(obd != NULL);
1106         LASSERT(cluuid != NULL);
1107         ENTRY;
1108
1109         export = class_new_export(obd, cluuid);
1110         if (IS_ERR(export))
1111                 RETURN(PTR_ERR(export));
1112
1113         conn->cookie = export->exp_handle.h_cookie;
1114         class_export_put(export);
1115
1116         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1117                cluuid->uuid, conn->cookie);
1118         RETURN(0);
1119 }
1120 EXPORT_SYMBOL(class_connect);
1121
1122 /* if export is involved in recovery then clean up related things */
1123 void class_export_recovery_cleanup(struct obd_export *exp)
1124 {
1125         struct obd_device *obd = exp->exp_obd;
1126
1127         cfs_spin_lock(&obd->obd_recovery_task_lock);
1128         if (exp->exp_delayed)
1129                 obd->obd_delayed_clients--;
1130         if (obd->obd_recovering && exp->exp_in_recovery) {
1131                 cfs_spin_lock(&exp->exp_lock);
1132                 exp->exp_in_recovery = 0;
1133                 cfs_spin_unlock(&exp->exp_lock);
1134                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1135                 cfs_atomic_dec(&obd->obd_connected_clients);
1136         }
1137         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1138         /** Cleanup req replay fields */
1139         if (exp->exp_req_replay_needed) {
1140                 cfs_spin_lock(&exp->exp_lock);
1141                 exp->exp_req_replay_needed = 0;
1142                 cfs_spin_unlock(&exp->exp_lock);
1143                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1144                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1145         }
1146         /** Cleanup lock replay data */
1147         if (exp->exp_lock_replay_needed) {
1148                 cfs_spin_lock(&exp->exp_lock);
1149                 exp->exp_lock_replay_needed = 0;
1150                 cfs_spin_unlock(&exp->exp_lock);
1151                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1152                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1153         }
1154 }
1155
1156 /* This function removes 1-3 references from the export:
1157  * 1 - for export pointer passed
1158  * and if disconnect really need
1159  * 2 - removing from hash
1160  * 3 - in client_unlink_export
1161  * The export pointer passed to this function can destroyed */
1162 int class_disconnect(struct obd_export *export)
1163 {
1164         int already_disconnected;
1165         ENTRY;
1166
1167         if (export == NULL) {
1168                 CWARN("attempting to free NULL export %p\n", export);
1169                 RETURN(-EINVAL);
1170         }
1171
1172         cfs_spin_lock(&export->exp_lock);
1173         already_disconnected = export->exp_disconnected;
1174         export->exp_disconnected = 1;
1175         cfs_spin_unlock(&export->exp_lock);
1176
1177         /* class_cleanup(), abort_recovery(), and class_fail_export()
1178          * all end up in here, and if any of them race we shouldn't
1179          * call extra class_export_puts(). */
1180         if (already_disconnected) {
1181                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1182                 GOTO(no_disconn, already_disconnected);
1183         }
1184
1185         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1186                export->exp_handle.h_cookie);
1187
1188         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1189                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1190                              &export->exp_connection->c_peer.nid,
1191                              &export->exp_nid_hash);
1192
1193         class_export_recovery_cleanup(export);
1194         class_unlink_export(export);
1195 no_disconn:
1196         class_export_put(export);
1197         RETURN(0);
1198 }
1199 EXPORT_SYMBOL(class_disconnect);
1200
1201 /* Return non-zero for a fully connected export */
1202 int class_connected_export(struct obd_export *exp)
1203 {
1204         if (exp) {
1205                 int connected;
1206                 cfs_spin_lock(&exp->exp_lock);
1207                 connected = (exp->exp_conn_cnt > 0);
1208                 cfs_spin_unlock(&exp->exp_lock);
1209                 return connected;
1210         }
1211         return 0;
1212 }
1213 EXPORT_SYMBOL(class_connected_export);
1214
1215 static void class_disconnect_export_list(cfs_list_t *list,
1216                                          enum obd_option flags)
1217 {
1218         int rc;
1219         struct obd_export *exp;
1220         ENTRY;
1221
1222         /* It's possible that an export may disconnect itself, but
1223          * nothing else will be added to this list. */
1224         while (!cfs_list_empty(list)) {
1225                 exp = cfs_list_entry(list->next, struct obd_export,
1226                                      exp_obd_chain);
1227                 /* need for safe call CDEBUG after obd_disconnect */
1228                 class_export_get(exp);
1229
1230                 cfs_spin_lock(&exp->exp_lock);
1231                 exp->exp_flags = flags;
1232                 cfs_spin_unlock(&exp->exp_lock);
1233
1234                 if (obd_uuid_equals(&exp->exp_client_uuid,
1235                                     &exp->exp_obd->obd_uuid)) {
1236                         CDEBUG(D_HA,
1237                                "exp %p export uuid == obd uuid, don't discon\n",
1238                                exp);
1239                         /* Need to delete this now so we don't end up pointing
1240                          * to work_list later when this export is cleaned up. */
1241                         cfs_list_del_init(&exp->exp_obd_chain);
1242                         class_export_put(exp);
1243                         continue;
1244                 }
1245
1246                 class_export_get(exp);
1247                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1248                        "last request at "CFS_TIME_T"\n",
1249                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1250                        exp, exp->exp_last_request_time);
1251                 /* release one export reference anyway */
1252                 rc = obd_disconnect(exp);
1253
1254                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1255                        obd_export_nid2str(exp), exp, rc);
1256                 class_export_put(exp);
1257         }
1258         EXIT;
1259 }
1260
1261 void class_disconnect_exports(struct obd_device *obd)
1262 {
1263         cfs_list_t work_list;
1264         ENTRY;
1265
1266         /* Move all of the exports from obd_exports to a work list, en masse. */
1267         CFS_INIT_LIST_HEAD(&work_list);
1268         cfs_spin_lock(&obd->obd_dev_lock);
1269         cfs_list_splice_init(&obd->obd_exports, &work_list);
1270         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1271         cfs_spin_unlock(&obd->obd_dev_lock);
1272
1273         if (!cfs_list_empty(&work_list)) {
1274                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1275                        "disconnecting them\n", obd->obd_minor, obd);
1276                 class_disconnect_export_list(&work_list,
1277                                              exp_flags_from_obd(obd));
1278         } else
1279                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1280                        obd->obd_minor, obd);
1281         EXIT;
1282 }
1283 EXPORT_SYMBOL(class_disconnect_exports);
1284
1285 /* Remove exports that have not completed recovery.
1286  */
1287 void class_disconnect_stale_exports(struct obd_device *obd,
1288                                     int (*test_export)(struct obd_export *))
1289 {
1290         cfs_list_t work_list;
1291         struct obd_export *exp, *n;
1292         int evicted = 0;
1293         ENTRY;
1294
1295         CFS_INIT_LIST_HEAD(&work_list);
1296         cfs_spin_lock(&obd->obd_dev_lock);
1297         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1298                                      exp_obd_chain) {
1299                 /* don't count self-export as client */
1300                 if (obd_uuid_equals(&exp->exp_client_uuid,
1301                                     &exp->exp_obd->obd_uuid))
1302                         continue;
1303
1304                 /* don't evict clients which have no slot in last_rcvd
1305                  * (e.g. lightweight connection) */
1306                 if (exp->exp_target_data.ted_lr_idx == -1)
1307                         continue;
1308
1309                 cfs_spin_lock(&exp->exp_lock);
1310                 if (test_export(exp)) {
1311                         cfs_spin_unlock(&exp->exp_lock);
1312                         continue;
1313                 }
1314                 exp->exp_failed = 1;
1315                 cfs_spin_unlock(&exp->exp_lock);
1316
1317                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1318                 evicted++;
1319                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1320                        obd->obd_name, exp->exp_client_uuid.uuid,
1321                        exp->exp_connection == NULL ? "<unknown>" :
1322                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1323                 print_export_data(exp, "EVICTING", 0);
1324         }
1325         cfs_spin_unlock(&obd->obd_dev_lock);
1326
1327         if (evicted) {
1328                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1329                               obd->obd_name, evicted);
1330                 obd->obd_stale_clients += evicted;
1331         }
1332         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1333                                                  OBD_OPT_ABORT_RECOV);
1334         EXIT;
1335 }
1336 EXPORT_SYMBOL(class_disconnect_stale_exports);
1337
1338 void class_fail_export(struct obd_export *exp)
1339 {
1340         int rc, already_failed;
1341
1342         cfs_spin_lock(&exp->exp_lock);
1343         already_failed = exp->exp_failed;
1344         exp->exp_failed = 1;
1345         cfs_spin_unlock(&exp->exp_lock);
1346
1347         if (already_failed) {
1348                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1349                        exp, exp->exp_client_uuid.uuid);
1350                 return;
1351         }
1352
1353         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1354                exp, exp->exp_client_uuid.uuid);
1355
1356         if (obd_dump_on_timeout)
1357                 libcfs_debug_dumplog();
1358
1359         /* need for safe call CDEBUG after obd_disconnect */
1360         class_export_get(exp);
1361
1362         /* Most callers into obd_disconnect are removing their own reference
1363          * (request, for example) in addition to the one from the hash table.
1364          * We don't have such a reference here, so make one. */
1365         class_export_get(exp);
1366         rc = obd_disconnect(exp);
1367         if (rc)
1368                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1369         else
1370                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1371                        exp, exp->exp_client_uuid.uuid);
1372         class_export_put(exp);
1373 }
1374 EXPORT_SYMBOL(class_fail_export);
1375
1376 char *obd_export_nid2str(struct obd_export *exp)
1377 {
1378         if (exp->exp_connection != NULL)
1379                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1380
1381         return "(no nid)";
1382 }
1383 EXPORT_SYMBOL(obd_export_nid2str);
1384
1385 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1386 {
1387         struct obd_export *doomed_exp = NULL;
1388         int exports_evicted = 0;
1389
1390         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1391
1392         do {
1393                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1394                 if (doomed_exp == NULL)
1395                         break;
1396
1397                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1398                          "nid %s found, wanted nid %s, requested nid %s\n",
1399                          obd_export_nid2str(doomed_exp),
1400                          libcfs_nid2str(nid_key), nid);
1401                 LASSERTF(doomed_exp != obd->obd_self_export,
1402                          "self-export is hashed by NID?\n");
1403                 exports_evicted++;
1404                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1405                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1406                        exports_evicted);
1407                 class_fail_export(doomed_exp);
1408                 class_export_put(doomed_exp);
1409         } while (1);
1410
1411         if (!exports_evicted)
1412                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1413                        obd->obd_name, nid);
1414         return exports_evicted;
1415 }
1416 EXPORT_SYMBOL(obd_export_evict_by_nid);
1417
1418 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1419 {
1420         struct obd_export *doomed_exp = NULL;
1421         struct obd_uuid doomed_uuid;
1422         int exports_evicted = 0;
1423
1424         obd_str2uuid(&doomed_uuid, uuid);
1425         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1426                 CERROR("%s: can't evict myself\n", obd->obd_name);
1427                 return exports_evicted;
1428         }
1429
1430         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1431
1432         if (doomed_exp == NULL) {
1433                 CERROR("%s: can't disconnect %s: no exports found\n",
1434                        obd->obd_name, uuid);
1435         } else {
1436                 CWARN("%s: evicting %s at adminstrative request\n",
1437                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1438                 class_fail_export(doomed_exp);
1439                 class_export_put(doomed_exp);
1440                 exports_evicted++;
1441         }
1442
1443         return exports_evicted;
1444 }
1445 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1446
1447 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1448 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1449 EXPORT_SYMBOL(class_export_dump_hook);
1450 #endif
1451
1452 static void print_export_data(struct obd_export *exp, const char *status,
1453                               int locks)
1454 {
1455         struct ptlrpc_reply_state *rs;
1456         struct ptlrpc_reply_state *first_reply = NULL;
1457         int nreplies = 0;
1458
1459         cfs_spin_lock(&exp->exp_lock);
1460         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1461                                 rs_exp_list) {
1462                 if (nreplies == 0)
1463                         first_reply = rs;
1464                 nreplies++;
1465         }
1466         cfs_spin_unlock(&exp->exp_lock);
1467
1468         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1469                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1470                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1471                cfs_atomic_read(&exp->exp_rpc_count),
1472                cfs_atomic_read(&exp->exp_cb_count),
1473                cfs_atomic_read(&exp->exp_locks_count),
1474                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1475                nreplies, first_reply, nreplies > 3 ? "..." : "",
1476                exp->exp_last_committed);
1477 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1478         if (locks && class_export_dump_hook != NULL)
1479                 class_export_dump_hook(exp);
1480 #endif
1481 }
1482
1483 void dump_exports(struct obd_device *obd, int locks)
1484 {
1485         struct obd_export *exp;
1486
1487         cfs_spin_lock(&obd->obd_dev_lock);
1488         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1489                 print_export_data(exp, "ACTIVE", locks);
1490         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1491                 print_export_data(exp, "UNLINKED", locks);
1492         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1493                 print_export_data(exp, "DELAYED", locks);
1494         cfs_spin_unlock(&obd->obd_dev_lock);
1495         cfs_spin_lock(&obd_zombie_impexp_lock);
1496         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1497                 print_export_data(exp, "ZOMBIE", locks);
1498         cfs_spin_unlock(&obd_zombie_impexp_lock);
1499 }
1500 EXPORT_SYMBOL(dump_exports);
1501
1502 void obd_exports_barrier(struct obd_device *obd)
1503 {
1504         int waited = 2;
1505         LASSERT(cfs_list_empty(&obd->obd_exports));
1506         cfs_spin_lock(&obd->obd_dev_lock);
1507         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1508                 cfs_spin_unlock(&obd->obd_dev_lock);
1509                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1510                                                    cfs_time_seconds(waited));
1511                 if (waited > 5 && IS_PO2(waited)) {
1512                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1513                                       "more than %d seconds. "
1514                                       "The obd refcount = %d. Is it stuck?\n",
1515                                       obd->obd_name, waited,
1516                                       cfs_atomic_read(&obd->obd_refcount));
1517                         dump_exports(obd, 1);
1518                 }
1519                 waited *= 2;
1520                 cfs_spin_lock(&obd->obd_dev_lock);
1521         }
1522         cfs_spin_unlock(&obd->obd_dev_lock);
1523 }
1524 EXPORT_SYMBOL(obd_exports_barrier);
1525
1526 /* Total amount of zombies to be destroyed */
1527 static int zombies_count = 0;
1528
1529 /**
1530  * kill zombie imports and exports
1531  */
1532 void obd_zombie_impexp_cull(void)
1533 {
1534         struct obd_import *import;
1535         struct obd_export *export;
1536         ENTRY;
1537
1538         do {
1539                 cfs_spin_lock(&obd_zombie_impexp_lock);
1540
1541                 import = NULL;
1542                 if (!cfs_list_empty(&obd_zombie_imports)) {
1543                         import = cfs_list_entry(obd_zombie_imports.next,
1544                                                 struct obd_import,
1545                                                 imp_zombie_chain);
1546                         cfs_list_del_init(&import->imp_zombie_chain);
1547                 }
1548
1549                 export = NULL;
1550                 if (!cfs_list_empty(&obd_zombie_exports)) {
1551                         export = cfs_list_entry(obd_zombie_exports.next,
1552                                                 struct obd_export,
1553                                                 exp_obd_chain);
1554                         cfs_list_del_init(&export->exp_obd_chain);
1555                 }
1556
1557                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1558
1559                 if (import != NULL) {
1560                         class_import_destroy(import);
1561                         cfs_spin_lock(&obd_zombie_impexp_lock);
1562                         zombies_count--;
1563                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1564                 }
1565
1566                 if (export != NULL) {
1567                         class_export_destroy(export);
1568                         cfs_spin_lock(&obd_zombie_impexp_lock);
1569                         zombies_count--;
1570                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1571                 }
1572
1573                 cfs_cond_resched();
1574         } while (import != NULL || export != NULL);
1575         EXIT;
1576 }
1577
1578 static cfs_completion_t         obd_zombie_start;
1579 static cfs_completion_t         obd_zombie_stop;
1580 static unsigned long            obd_zombie_flags;
1581 static cfs_waitq_t              obd_zombie_waitq;
1582 static pid_t                    obd_zombie_pid;
1583
1584 enum {
1585         OBD_ZOMBIE_STOP   = 1 << 1
1586 };
1587
1588 /**
1589  * check for work for kill zombie import/export thread.
1590  */
1591 static int obd_zombie_impexp_check(void *arg)
1592 {
1593         int rc;
1594
1595         cfs_spin_lock(&obd_zombie_impexp_lock);
1596         rc = (zombies_count == 0) &&
1597              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1598         cfs_spin_unlock(&obd_zombie_impexp_lock);
1599
1600         RETURN(rc);
1601 }
1602
1603 /**
1604  * Add export to the obd_zombe thread and notify it.
1605  */
1606 static void obd_zombie_export_add(struct obd_export *exp) {
1607         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1608         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1609         cfs_list_del_init(&exp->exp_obd_chain);
1610         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1611         cfs_spin_lock(&obd_zombie_impexp_lock);
1612         zombies_count++;
1613         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1614         cfs_spin_unlock(&obd_zombie_impexp_lock);
1615
1616         obd_zombie_impexp_notify();
1617 }
1618
1619 /**
1620  * Add import to the obd_zombe thread and notify it.
1621  */
1622 static void obd_zombie_import_add(struct obd_import *imp) {
1623         LASSERT(imp->imp_sec == NULL);
1624         LASSERT(imp->imp_rq_pool == NULL);
1625         cfs_spin_lock(&obd_zombie_impexp_lock);
1626         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1627         zombies_count++;
1628         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1629         cfs_spin_unlock(&obd_zombie_impexp_lock);
1630
1631         obd_zombie_impexp_notify();
1632 }
1633
1634 /**
1635  * notify import/export destroy thread about new zombie.
1636  */
1637 static void obd_zombie_impexp_notify(void)
1638 {
1639         /*
1640          * Make sure obd_zomebie_impexp_thread get this notification.
1641          * It is possible this signal only get by obd_zombie_barrier, and
1642          * barrier gulps this notification and sleeps away and hangs ensues
1643          */
1644         cfs_waitq_broadcast(&obd_zombie_waitq);
1645 }
1646
1647 /**
1648  * check whether obd_zombie is idle
1649  */
1650 static int obd_zombie_is_idle(void)
1651 {
1652         int rc;
1653
1654         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1655         cfs_spin_lock(&obd_zombie_impexp_lock);
1656         rc = (zombies_count == 0);
1657         cfs_spin_unlock(&obd_zombie_impexp_lock);
1658         return rc;
1659 }
1660
1661 /**
1662  * wait when obd_zombie import/export queues become empty
1663  */
1664 void obd_zombie_barrier(void)
1665 {
1666         struct l_wait_info lwi = { 0 };
1667
1668         if (obd_zombie_pid == cfs_curproc_pid())
1669                 /* don't wait for myself */
1670                 return;
1671         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1672 }
1673 EXPORT_SYMBOL(obd_zombie_barrier);
1674
1675 #ifdef __KERNEL__
1676
1677 /**
1678  * destroy zombie export/import thread.
1679  */
1680 static int obd_zombie_impexp_thread(void *unused)
1681 {
1682         int rc;
1683
1684         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1685                 cfs_complete(&obd_zombie_start);
1686                 RETURN(rc);
1687         }
1688
1689         cfs_complete(&obd_zombie_start);
1690
1691         obd_zombie_pid = cfs_curproc_pid();
1692
1693         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1694                 struct l_wait_info lwi = { 0 };
1695
1696                 l_wait_event(obd_zombie_waitq,
1697                              !obd_zombie_impexp_check(NULL), &lwi);
1698                 obd_zombie_impexp_cull();
1699
1700                 /*
1701                  * Notify obd_zombie_barrier callers that queues
1702                  * may be empty.
1703                  */
1704                 cfs_waitq_signal(&obd_zombie_waitq);
1705         }
1706
1707         cfs_complete(&obd_zombie_stop);
1708
1709         RETURN(0);
1710 }
1711
1712 #else /* ! KERNEL */
1713
1714 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1715 static void *obd_zombie_impexp_work_cb;
1716 static void *obd_zombie_impexp_idle_cb;
1717
1718 int obd_zombie_impexp_kill(void *arg)
1719 {
1720         int rc = 0;
1721
1722         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1723                 obd_zombie_impexp_cull();
1724                 rc = 1;
1725         }
1726         cfs_atomic_dec(&zombie_recur);
1727         return rc;
1728 }
1729
1730 #endif
1731
1732 /**
1733  * start destroy zombie import/export thread
1734  */
1735 int obd_zombie_impexp_init(void)
1736 {
1737         int rc;
1738
1739         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1740         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1741         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1742         cfs_init_completion(&obd_zombie_start);
1743         cfs_init_completion(&obd_zombie_stop);
1744         cfs_waitq_init(&obd_zombie_waitq);
1745         obd_zombie_pid = 0;
1746
1747 #ifdef __KERNEL__
1748         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1749         if (rc < 0)
1750                 RETURN(rc);
1751
1752         cfs_wait_for_completion(&obd_zombie_start);
1753 #else
1754
1755         obd_zombie_impexp_work_cb =
1756                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1757                                                  &obd_zombie_impexp_kill, NULL);
1758
1759         obd_zombie_impexp_idle_cb =
1760                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1761                                                  &obd_zombie_impexp_check, NULL);
1762         rc = 0;
1763 #endif
1764         RETURN(rc);
1765 }
1766 /**
1767  * stop destroy zombie import/export thread
1768  */
1769 void obd_zombie_impexp_stop(void)
1770 {
1771         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1772         obd_zombie_impexp_notify();
1773 #ifdef __KERNEL__
1774         cfs_wait_for_completion(&obd_zombie_stop);
1775 #else
1776         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1777         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1778 #endif
1779 }
1780
1781 /***** Kernel-userspace comm helpers *******/
1782
1783 /* Get length of entire message, including header */
1784 int kuc_len(int payload_len)
1785 {
1786         return sizeof(struct kuc_hdr) + payload_len;
1787 }
1788 EXPORT_SYMBOL(kuc_len);
1789
1790 /* Get a pointer to kuc header, given a ptr to the payload
1791  * @param p Pointer to payload area
1792  * @returns Pointer to kuc header
1793  */
1794 struct kuc_hdr * kuc_ptr(void *p)
1795 {
1796         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1797         LASSERT(lh->kuc_magic == KUC_MAGIC);
1798         return lh;
1799 }
1800 EXPORT_SYMBOL(kuc_ptr);
1801
1802 /* Test if payload is part of kuc message
1803  * @param p Pointer to payload area
1804  * @returns boolean
1805  */
1806 int kuc_ispayload(void *p)
1807 {
1808         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1809
1810         if (kh->kuc_magic == KUC_MAGIC)
1811                 return 1;
1812         else
1813                 return 0;
1814 }
1815 EXPORT_SYMBOL(kuc_ispayload);
1816
1817 /* Alloc space for a message, and fill in header
1818  * @return Pointer to payload area
1819  */
1820 void *kuc_alloc(int payload_len, int transport, int type)
1821 {
1822         struct kuc_hdr *lh;
1823         int len = kuc_len(payload_len);
1824
1825         OBD_ALLOC(lh, len);
1826         if (lh == NULL)
1827                 return ERR_PTR(-ENOMEM);
1828
1829         lh->kuc_magic = KUC_MAGIC;
1830         lh->kuc_transport = transport;
1831         lh->kuc_msgtype = type;
1832         lh->kuc_msglen = len;
1833
1834         return (void *)(lh + 1);
1835 }
1836 EXPORT_SYMBOL(kuc_alloc);
1837
1838 /* Takes pointer to payload area */
1839 inline void kuc_free(void *p, int payload_len)
1840 {
1841         struct kuc_hdr *lh = kuc_ptr(p);
1842         OBD_FREE(lh, kuc_len(payload_len));
1843 }
1844 EXPORT_SYMBOL(kuc_free);
1845
1846
1847