Whamcloud - gitweb
LU-2275: obdclass: Proper error cleaup for class_newdev
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!cfs_request_module("%s", modname)) {
129                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130                         type = class_search_type(name);
131                 } else {
132                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133                                            modname);
134                 }
135         }
136 #endif
137         if (type) {
138                 cfs_spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 cfs_try_module_get(type->typ_dt_ops->o_owner);
141                 cfs_spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         cfs_spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         cfs_module_put(type->typ_dt_ops->o_owner);
153         cfs_spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165         ENTRY;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 RETURN(-EEXIST);
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 RETURN(rc);
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         cfs_spin_lock_init(&type->obd_type_lock);
195
196 #ifdef LPROCFS
197         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
198                                               vars, type);
199         if (IS_ERR(type->typ_procroot)) {
200                 rc = PTR_ERR(type->typ_procroot);
201                 type->typ_procroot = NULL;
202                 GOTO (failed, rc);
203         }
204 #endif
205         if (ldt != NULL) {
206                 type->typ_lu = ldt;
207                 rc = lu_device_type_init(ldt);
208                 if (rc != 0)
209                         GOTO (failed, rc);
210         }
211
212         cfs_spin_lock(&obd_types_lock);
213         cfs_list_add(&type->typ_chain, &obd_types);
214         cfs_spin_unlock(&obd_types_lock);
215
216         RETURN (0);
217
218  failed:
219         if (type->typ_name != NULL)
220                 OBD_FREE(type->typ_name, strlen(name) + 1);
221         if (type->typ_md_ops != NULL)
222                 OBD_FREE_PTR(type->typ_md_ops);
223         if (type->typ_dt_ops != NULL)
224                 OBD_FREE_PTR(type->typ_dt_ops);
225         OBD_FREE(type, sizeof(*type));
226         RETURN(rc);
227 }
228 EXPORT_SYMBOL(class_register_type);
229
230 int class_unregister_type(const char *name)
231 {
232         struct obd_type *type = class_search_type(name);
233         ENTRY;
234
235         if (!type) {
236                 CERROR("unknown obd type\n");
237                 RETURN(-EINVAL);
238         }
239
240         if (type->typ_refcnt) {
241                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242                 /* This is a bad situation, let's make the best of it */
243                 /* Remove ops, but leave the name for debugging */
244                 OBD_FREE_PTR(type->typ_dt_ops);
245                 OBD_FREE_PTR(type->typ_md_ops);
246                 RETURN(-EBUSY);
247         }
248
249         /* we do not use type->typ_procroot as for compatibility purposes
250          * other modules can share names (i.e. lod can use lov entry). so
251          * we can't reference pointer as it can get invalided when another
252          * module removes the entry */
253         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
254
255         if (type->typ_lu)
256                 lu_device_type_fini(type->typ_lu);
257
258         cfs_spin_lock(&obd_types_lock);
259         cfs_list_del(&type->typ_chain);
260         cfs_spin_unlock(&obd_types_lock);
261         OBD_FREE(type->typ_name, strlen(name) + 1);
262         if (type->typ_dt_ops != NULL)
263                 OBD_FREE_PTR(type->typ_dt_ops);
264         if (type->typ_md_ops != NULL)
265                 OBD_FREE_PTR(type->typ_md_ops);
266         OBD_FREE(type, sizeof(*type));
267         RETURN(0);
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
270
271 /**
272  * Create a new obd device.
273  *
274  * Find an empty slot in ::obd_devs[], create a new obd device in it.
275  *
276  * \param[in] type_name obd device type string.
277  * \param[in] name      obd device name.
278  *
279  * \retval NULL if create fails, otherwise return the obd device
280  *         pointer created.
281  */
282 struct obd_device *class_newdev(const char *type_name, const char *name)
283 {
284         struct obd_device *result = NULL;
285         struct obd_device *newdev;
286         struct obd_type *type = NULL;
287         int i;
288         int new_obd_minor = 0;
289         ENTRY;
290
291         if (strlen(name) >= MAX_OBD_NAME) {
292                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293                 RETURN(ERR_PTR(-EINVAL));
294         }
295
296         type = class_get_type(type_name);
297         if (type == NULL){
298                 CERROR("OBD: unknown type: %s\n", type_name);
299                 RETURN(ERR_PTR(-ENODEV));
300         }
301
302         newdev = obd_device_alloc();
303         if (newdev == NULL)
304                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
305
306         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
307
308         cfs_write_lock(&obd_dev_lock);
309         for (i = 0; i < class_devno_max(); i++) {
310                 struct obd_device *obd = class_num2obd(i);
311
312                 if (obd && obd->obd_name &&
313                     (strcmp(name, obd->obd_name) == 0)) {
314                         CERROR("Device %s already exists at %d, won't add\n",
315                                name, i);
316                         if (result) {
317                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318                                          "%p obd_magic %08x != %08x\n", result,
319                                          result->obd_magic, OBD_DEVICE_MAGIC);
320                                 LASSERTF(result->obd_minor == new_obd_minor,
321                                          "%p obd_minor %d != %d\n", result,
322                                          result->obd_minor, new_obd_minor);
323
324                                 obd_devs[result->obd_minor] = NULL;
325                                 result->obd_name[0]='\0';
326                          }
327                         result = ERR_PTR(-EEXIST);
328                         break;
329                 }
330                 if (!result && !obd) {
331                         result = newdev;
332                         result->obd_minor = i;
333                         new_obd_minor = i;
334                         result->obd_type = type;
335                         strncpy(result->obd_name, name,
336                                 sizeof(result->obd_name) - 1);
337                         obd_devs[i] = result;
338                 }
339         }
340         cfs_write_unlock(&obd_dev_lock);
341
342         if (result == NULL && i >= class_devno_max()) {
343                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
344                        class_devno_max());
345                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
346         }
347
348         if (IS_ERR(result))
349                 GOTO(out, result);
350
351         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
352                result->obd_name, result);
353
354         RETURN(result);
355 out:
356         obd_device_free(newdev);
357 out_type:
358         class_put_type(type);
359         return result;
360 }
361
362 void class_release_dev(struct obd_device *obd)
363 {
364         struct obd_type *obd_type = obd->obd_type;
365
366         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
367                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
368         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
369                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
370         LASSERT(obd_type != NULL);
371
372         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
373                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
374
375         cfs_write_lock(&obd_dev_lock);
376         obd_devs[obd->obd_minor] = NULL;
377         cfs_write_unlock(&obd_dev_lock);
378         obd_device_free(obd);
379
380         class_put_type(obd_type);
381 }
382
383 int class_name2dev(const char *name)
384 {
385         int i;
386
387         if (!name)
388                 return -1;
389
390         cfs_read_lock(&obd_dev_lock);
391         for (i = 0; i < class_devno_max(); i++) {
392                 struct obd_device *obd = class_num2obd(i);
393
394                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
395                         /* Make sure we finished attaching before we give
396                            out any references */
397                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
398                         if (obd->obd_attached) {
399                                 cfs_read_unlock(&obd_dev_lock);
400                                 return i;
401                         }
402                         break;
403                 }
404         }
405         cfs_read_unlock(&obd_dev_lock);
406
407         return -1;
408 }
409 EXPORT_SYMBOL(class_name2dev);
410
411 struct obd_device *class_name2obd(const char *name)
412 {
413         int dev = class_name2dev(name);
414
415         if (dev < 0 || dev > class_devno_max())
416                 return NULL;
417         return class_num2obd(dev);
418 }
419 EXPORT_SYMBOL(class_name2obd);
420
421 int class_uuid2dev(struct obd_uuid *uuid)
422 {
423         int i;
424
425         cfs_read_lock(&obd_dev_lock);
426         for (i = 0; i < class_devno_max(); i++) {
427                 struct obd_device *obd = class_num2obd(i);
428
429                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
430                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
431                         cfs_read_unlock(&obd_dev_lock);
432                         return i;
433                 }
434         }
435         cfs_read_unlock(&obd_dev_lock);
436
437         return -1;
438 }
439 EXPORT_SYMBOL(class_uuid2dev);
440
441 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
442 {
443         int dev = class_uuid2dev(uuid);
444         if (dev < 0)
445                 return NULL;
446         return class_num2obd(dev);
447 }
448 EXPORT_SYMBOL(class_uuid2obd);
449
450 /**
451  * Get obd device from ::obd_devs[]
452  *
453  * \param num [in] array index
454  *
455  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
456  *         otherwise return the obd device there.
457  */
458 struct obd_device *class_num2obd(int num)
459 {
460         struct obd_device *obd = NULL;
461
462         if (num < class_devno_max()) {
463                 obd = obd_devs[num];
464                 if (obd == NULL)
465                         return NULL;
466
467                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
468                          "%p obd_magic %08x != %08x\n",
469                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
470                 LASSERTF(obd->obd_minor == num,
471                          "%p obd_minor %0d != %0d\n",
472                          obd, obd->obd_minor, num);
473         }
474
475         return obd;
476 }
477 EXPORT_SYMBOL(class_num2obd);
478
479 void class_obd_list(void)
480 {
481         char *status;
482         int i;
483
484         cfs_read_lock(&obd_dev_lock);
485         for (i = 0; i < class_devno_max(); i++) {
486                 struct obd_device *obd = class_num2obd(i);
487
488                 if (obd == NULL)
489                         continue;
490                 if (obd->obd_stopping)
491                         status = "ST";
492                 else if (obd->obd_set_up)
493                         status = "UP";
494                 else if (obd->obd_attached)
495                         status = "AT";
496                 else
497                         status = "--";
498                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
499                          i, status, obd->obd_type->typ_name,
500                          obd->obd_name, obd->obd_uuid.uuid,
501                          cfs_atomic_read(&obd->obd_refcount));
502         }
503         cfs_read_unlock(&obd_dev_lock);
504         return;
505 }
506
507 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
508    specified, then only the client with that uuid is returned,
509    otherwise any client connected to the tgt is returned. */
510 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
511                                           const char * typ_name,
512                                           struct obd_uuid *grp_uuid)
513 {
514         int i;
515
516         cfs_read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if ((strncmp(obd->obd_type->typ_name, typ_name,
523                              strlen(typ_name)) == 0)) {
524                         if (obd_uuid_equals(tgt_uuid,
525                                             &obd->u.cli.cl_target_uuid) &&
526                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
527                                                          &obd->obd_uuid) : 1)) {
528                                 cfs_read_unlock(&obd_dev_lock);
529                                 return obd;
530                         }
531                 }
532         }
533         cfs_read_unlock(&obd_dev_lock);
534
535         return NULL;
536 }
537 EXPORT_SYMBOL(class_find_client_obd);
538
539 /* Iterate the obd_device list looking devices have grp_uuid. Start
540    searching at *next, and if a device is found, the next index to look
541    at is saved in *next. If next is NULL, then the first matching device
542    will always be returned. */
543 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
544 {
545         int i;
546
547         if (next == NULL)
548                 i = 0;
549         else if (*next >= 0 && *next < class_devno_max())
550                 i = *next;
551         else
552                 return NULL;
553
554         cfs_read_lock(&obd_dev_lock);
555         for (; i < class_devno_max(); i++) {
556                 struct obd_device *obd = class_num2obd(i);
557
558                 if (obd == NULL)
559                         continue;
560                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
561                         if (next != NULL)
562                                 *next = i+1;
563                         cfs_read_unlock(&obd_dev_lock);
564                         return obd;
565                 }
566         }
567         cfs_read_unlock(&obd_dev_lock);
568
569         return NULL;
570 }
571 EXPORT_SYMBOL(class_devices_in_group);
572
573 /**
574  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
575  * adjust sptlrpc settings accordingly.
576  */
577 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
578 {
579         struct obd_device  *obd;
580         const char         *type;
581         int                 i, rc = 0, rc2;
582
583         LASSERT(namelen > 0);
584
585         cfs_read_lock(&obd_dev_lock);
586         for (i = 0; i < class_devno_max(); i++) {
587                 obd = class_num2obd(i);
588
589                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
590                         continue;
591
592                 /* only notify mdc, osc, mdt, ost */
593                 type = obd->obd_type->typ_name;
594                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
595                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
596                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
597                     strcmp(type, LUSTRE_OST_NAME) != 0)
598                         continue;
599
600                 if (strncmp(obd->obd_name, fsname, namelen))
601                         continue;
602
603                 class_incref(obd, __FUNCTION__, obd);
604                 cfs_read_unlock(&obd_dev_lock);
605                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
606                                          sizeof(KEY_SPTLRPC_CONF),
607                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
608                 rc = rc ? rc : rc2;
609                 class_decref(obd, __FUNCTION__, obd);
610                 cfs_read_lock(&obd_dev_lock);
611         }
612         cfs_read_unlock(&obd_dev_lock);
613         return rc;
614 }
615 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
616
617 void obd_cleanup_caches(void)
618 {
619         int rc;
620
621         ENTRY;
622         if (obd_device_cachep) {
623                 rc = cfs_mem_cache_destroy(obd_device_cachep);
624                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
625                 obd_device_cachep = NULL;
626         }
627         if (obdo_cachep) {
628                 rc = cfs_mem_cache_destroy(obdo_cachep);
629                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
630                 obdo_cachep = NULL;
631         }
632         if (import_cachep) {
633                 rc = cfs_mem_cache_destroy(import_cachep);
634                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
635                 import_cachep = NULL;
636         }
637         if (capa_cachep) {
638                 rc = cfs_mem_cache_destroy(capa_cachep);
639                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
640                 capa_cachep = NULL;
641         }
642         EXIT;
643 }
644
645 int obd_init_caches(void)
646 {
647         ENTRY;
648
649         LASSERT(obd_device_cachep == NULL);
650         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
651                                                  sizeof(struct obd_device),
652                                                  0, 0);
653         if (!obd_device_cachep)
654                 GOTO(out, -ENOMEM);
655
656         LASSERT(obdo_cachep == NULL);
657         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
658                                            0, 0);
659         if (!obdo_cachep)
660                 GOTO(out, -ENOMEM);
661
662         LASSERT(import_cachep == NULL);
663         import_cachep = cfs_mem_cache_create("ll_import_cache",
664                                              sizeof(struct obd_import),
665                                              0, 0);
666         if (!import_cachep)
667                 GOTO(out, -ENOMEM);
668
669         LASSERT(capa_cachep == NULL);
670         capa_cachep = cfs_mem_cache_create("capa_cache",
671                                            sizeof(struct obd_capa), 0, 0);
672         if (!capa_cachep)
673                 GOTO(out, -ENOMEM);
674
675         RETURN(0);
676  out:
677         obd_cleanup_caches();
678         RETURN(-ENOMEM);
679
680 }
681
682 /* map connection to client */
683 struct obd_export *class_conn2export(struct lustre_handle *conn)
684 {
685         struct obd_export *export;
686         ENTRY;
687
688         if (!conn) {
689                 CDEBUG(D_CACHE, "looking for null handle\n");
690                 RETURN(NULL);
691         }
692
693         if (conn->cookie == -1) {  /* this means assign a new connection */
694                 CDEBUG(D_CACHE, "want a new connection\n");
695                 RETURN(NULL);
696         }
697
698         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
699         export = class_handle2object(conn->cookie);
700         RETURN(export);
701 }
702 EXPORT_SYMBOL(class_conn2export);
703
704 struct obd_device *class_exp2obd(struct obd_export *exp)
705 {
706         if (exp)
707                 return exp->exp_obd;
708         return NULL;
709 }
710 EXPORT_SYMBOL(class_exp2obd);
711
712 struct obd_device *class_conn2obd(struct lustre_handle *conn)
713 {
714         struct obd_export *export;
715         export = class_conn2export(conn);
716         if (export) {
717                 struct obd_device *obd = export->exp_obd;
718                 class_export_put(export);
719                 return obd;
720         }
721         return NULL;
722 }
723 EXPORT_SYMBOL(class_conn2obd);
724
725 struct obd_import *class_exp2cliimp(struct obd_export *exp)
726 {
727         struct obd_device *obd = exp->exp_obd;
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_exp2cliimp);
733
734 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
735 {
736         struct obd_device *obd = class_conn2obd(conn);
737         if (obd == NULL)
738                 return NULL;
739         return obd->u.cli.cl_import;
740 }
741 EXPORT_SYMBOL(class_conn2cliimp);
742
743 /* Export management functions */
744 static void class_export_destroy(struct obd_export *exp)
745 {
746         struct obd_device *obd = exp->exp_obd;
747         ENTRY;
748
749         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
750         LASSERT(obd != NULL);
751
752         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753                exp->exp_client_uuid.uuid, obd->obd_name);
754
755         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
756         if (exp->exp_connection)
757                 ptlrpc_put_connection_superhack(exp->exp_connection);
758
759         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
760         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
761         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
762         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
763         obd_destroy_export(exp);
764         class_decref(obd, "export", exp);
765
766         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
767         EXIT;
768 }
769
770 static void export_handle_addref(void *export)
771 {
772         class_export_get(export);
773 }
774
775 static struct portals_handle_ops export_handle_ops = {
776         .hop_addref = export_handle_addref,
777         .hop_free   = NULL,
778 };
779
780 struct obd_export *class_export_get(struct obd_export *exp)
781 {
782         cfs_atomic_inc(&exp->exp_refcount);
783         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
784                cfs_atomic_read(&exp->exp_refcount));
785         return exp;
786 }
787 EXPORT_SYMBOL(class_export_get);
788
789 void class_export_put(struct obd_export *exp)
790 {
791         LASSERT(exp != NULL);
792         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
793         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
794                cfs_atomic_read(&exp->exp_refcount) - 1);
795
796         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
797                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
798                 CDEBUG(D_IOCTL, "final put %p/%s\n",
799                        exp, exp->exp_client_uuid.uuid);
800
801                 /* release nid stat refererence */
802                 lprocfs_exp_cleanup(exp);
803
804                 obd_zombie_export_add(exp);
805         }
806 }
807 EXPORT_SYMBOL(class_export_put);
808
809 /* Creates a new export, adds it to the hash table, and returns a
810  * pointer to it. The refcount is 2: one for the hash reference, and
811  * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813                                     struct obd_uuid *cluuid)
814 {
815         struct obd_export *export;
816         cfs_hash_t *hash = NULL;
817         int rc = 0;
818         ENTRY;
819
820         OBD_ALLOC_PTR(export);
821         if (!export)
822                 return ERR_PTR(-ENOMEM);
823
824         export->exp_conn_cnt = 0;
825         export->exp_lock_hash = NULL;
826         export->exp_flock_hash = NULL;
827         cfs_atomic_set(&export->exp_refcount, 2);
828         cfs_atomic_set(&export->exp_rpc_count, 0);
829         cfs_atomic_set(&export->exp_cb_count, 0);
830         cfs_atomic_set(&export->exp_locks_count, 0);
831 #if LUSTRE_TRACKS_LOCK_EXP_REFS
832         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
833         cfs_spin_lock_init(&export->exp_locks_list_guard);
834 #endif
835         cfs_atomic_set(&export->exp_replay_count, 0);
836         export->exp_obd = obd;
837         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
838         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
839         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
840         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
841         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
842         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
843         class_handle_hash(&export->exp_handle, &export_handle_ops);
844         export->exp_last_request_time = cfs_time_current_sec();
845         cfs_spin_lock_init(&export->exp_lock);
846         cfs_spin_lock_init(&export->exp_rpc_lock);
847         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
848         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
849         cfs_spin_lock_init(&export->exp_bl_list_lock);
850         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
851
852         export->exp_sp_peer = LUSTRE_SP_ANY;
853         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
854         export->exp_client_uuid = *cluuid;
855         obd_init_export(export);
856
857         cfs_spin_lock(&obd->obd_dev_lock);
858          /* shouldn't happen, but might race */
859         if (obd->obd_stopping)
860                 GOTO(exit_unlock, rc = -ENODEV);
861
862         hash = cfs_hash_getref(obd->obd_uuid_hash);
863         if (hash == NULL)
864                 GOTO(exit_unlock, rc = -ENODEV);
865         cfs_spin_unlock(&obd->obd_dev_lock);
866
867         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
868                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
869                 if (rc != 0) {
870                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
871                                       obd->obd_name, cluuid->uuid, rc);
872                         GOTO(exit_err, rc = -EALREADY);
873                 }
874         }
875
876         cfs_spin_lock(&obd->obd_dev_lock);
877         if (obd->obd_stopping) {
878                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
879                 GOTO(exit_unlock, rc = -ENODEV);
880         }
881
882         class_incref(obd, "export", export);
883         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
884         cfs_list_add_tail(&export->exp_obd_chain_timed,
885                           &export->exp_obd->obd_exports_timed);
886         export->exp_obd->obd_num_exports++;
887         cfs_spin_unlock(&obd->obd_dev_lock);
888         cfs_hash_putref(hash);
889         RETURN(export);
890
891 exit_unlock:
892         cfs_spin_unlock(&obd->obd_dev_lock);
893 exit_err:
894         if (hash)
895                 cfs_hash_putref(hash);
896         class_handle_unhash(&export->exp_handle);
897         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
898         obd_destroy_export(export);
899         OBD_FREE_PTR(export);
900         return ERR_PTR(rc);
901 }
902 EXPORT_SYMBOL(class_new_export);
903
904 void class_unlink_export(struct obd_export *exp)
905 {
906         class_handle_unhash(&exp->exp_handle);
907
908         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
909         /* delete an uuid-export hashitem from hashtables */
910         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
911                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
912                              &exp->exp_client_uuid,
913                              &exp->exp_uuid_hash);
914
915         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
916         cfs_list_del_init(&exp->exp_obd_chain_timed);
917         exp->exp_obd->obd_num_exports--;
918         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
919         class_export_put(exp);
920 }
921 EXPORT_SYMBOL(class_unlink_export);
922
923 /* Import management functions */
924 void class_import_destroy(struct obd_import *imp)
925 {
926         ENTRY;
927
928         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
929                 imp->imp_obd->obd_name);
930
931         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
932
933         ptlrpc_put_connection_superhack(imp->imp_connection);
934
935         while (!cfs_list_empty(&imp->imp_conn_list)) {
936                 struct obd_import_conn *imp_conn;
937
938                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
939                                           struct obd_import_conn, oic_item);
940                 cfs_list_del_init(&imp_conn->oic_item);
941                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
942                 OBD_FREE(imp_conn, sizeof(*imp_conn));
943         }
944
945         LASSERT(imp->imp_sec == NULL);
946         class_decref(imp->imp_obd, "import", imp);
947         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
948         EXIT;
949 }
950
951 static void import_handle_addref(void *import)
952 {
953         class_import_get(import);
954 }
955
956 static struct portals_handle_ops import_handle_ops = {
957         .hop_addref = import_handle_addref,
958         .hop_free   = NULL,
959 };
960
961 struct obd_import *class_import_get(struct obd_import *import)
962 {
963         cfs_atomic_inc(&import->imp_refcount);
964         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
965                cfs_atomic_read(&import->imp_refcount),
966                import->imp_obd->obd_name);
967         return import;
968 }
969 EXPORT_SYMBOL(class_import_get);
970
971 void class_import_put(struct obd_import *imp)
972 {
973         ENTRY;
974
975         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
976         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
977
978         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
979                cfs_atomic_read(&imp->imp_refcount) - 1,
980                imp->imp_obd->obd_name);
981
982         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
983                 CDEBUG(D_INFO, "final put import %p\n", imp);
984                 obd_zombie_import_add(imp);
985         }
986
987         /* catch possible import put race */
988         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
989         EXIT;
990 }
991 EXPORT_SYMBOL(class_import_put);
992
993 static void init_imp_at(struct imp_at *at) {
994         int i;
995         at_init(&at->iat_net_latency, 0, 0);
996         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
997                 /* max service estimates are tracked on the server side, so
998                    don't use the AT history here, just use the last reported
999                    val. (But keep hist for proc histogram, worst_ever) */
1000                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1001                         AT_FLG_NOHIST);
1002         }
1003 }
1004
1005 struct obd_import *class_new_import(struct obd_device *obd)
1006 {
1007         struct obd_import *imp;
1008
1009         OBD_ALLOC(imp, sizeof(*imp));
1010         if (imp == NULL)
1011                 return NULL;
1012
1013         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1014         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1015         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1016         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1017         cfs_spin_lock_init(&imp->imp_lock);
1018         imp->imp_last_success_conn = 0;
1019         imp->imp_state = LUSTRE_IMP_NEW;
1020         imp->imp_obd = class_incref(obd, "import", imp);
1021         cfs_mutex_init(&imp->imp_sec_mutex);
1022         cfs_waitq_init(&imp->imp_recovery_waitq);
1023
1024         cfs_atomic_set(&imp->imp_refcount, 2);
1025         cfs_atomic_set(&imp->imp_unregistering, 0);
1026         cfs_atomic_set(&imp->imp_inflight, 0);
1027         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1028         cfs_atomic_set(&imp->imp_inval_count, 0);
1029         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1030         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1031         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1032         init_imp_at(&imp->imp_at);
1033
1034         /* the default magic is V2, will be used in connect RPC, and
1035          * then adjusted according to the flags in request/reply. */
1036         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1037
1038         return imp;
1039 }
1040 EXPORT_SYMBOL(class_new_import);
1041
1042 void class_destroy_import(struct obd_import *import)
1043 {
1044         LASSERT(import != NULL);
1045         LASSERT(import != LP_POISON);
1046
1047         class_handle_unhash(&import->imp_handle);
1048
1049         cfs_spin_lock(&import->imp_lock);
1050         import->imp_generation++;
1051         cfs_spin_unlock(&import->imp_lock);
1052         class_import_put(import);
1053 }
1054 EXPORT_SYMBOL(class_destroy_import);
1055
1056 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1057
1058 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1059 {
1060         cfs_spin_lock(&exp->exp_locks_list_guard);
1061
1062         LASSERT(lock->l_exp_refs_nr >= 0);
1063
1064         if (lock->l_exp_refs_target != NULL &&
1065             lock->l_exp_refs_target != exp) {
1066                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1067                               exp, lock, lock->l_exp_refs_target);
1068         }
1069         if ((lock->l_exp_refs_nr ++) == 0) {
1070                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1071                 lock->l_exp_refs_target = exp;
1072         }
1073         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1074                lock, exp, lock->l_exp_refs_nr);
1075         cfs_spin_unlock(&exp->exp_locks_list_guard);
1076 }
1077 EXPORT_SYMBOL(__class_export_add_lock_ref);
1078
1079 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1080 {
1081         cfs_spin_lock(&exp->exp_locks_list_guard);
1082         LASSERT(lock->l_exp_refs_nr > 0);
1083         if (lock->l_exp_refs_target != exp) {
1084                 LCONSOLE_WARN("lock %p, "
1085                               "mismatching export pointers: %p, %p\n",
1086                               lock, lock->l_exp_refs_target, exp);
1087         }
1088         if (-- lock->l_exp_refs_nr == 0) {
1089                 cfs_list_del_init(&lock->l_exp_refs_link);
1090                 lock->l_exp_refs_target = NULL;
1091         }
1092         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093                lock, exp, lock->l_exp_refs_nr);
1094         cfs_spin_unlock(&exp->exp_locks_list_guard);
1095 }
1096 EXPORT_SYMBOL(__class_export_del_lock_ref);
1097 #endif
1098
1099 /* A connection defines an export context in which preallocation can
1100    be managed. This releases the export pointer reference, and returns
1101    the export handle, so the export refcount is 1 when this function
1102    returns. */
1103 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1104                   struct obd_uuid *cluuid)
1105 {
1106         struct obd_export *export;
1107         LASSERT(conn != NULL);
1108         LASSERT(obd != NULL);
1109         LASSERT(cluuid != NULL);
1110         ENTRY;
1111
1112         export = class_new_export(obd, cluuid);
1113         if (IS_ERR(export))
1114                 RETURN(PTR_ERR(export));
1115
1116         conn->cookie = export->exp_handle.h_cookie;
1117         class_export_put(export);
1118
1119         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1120                cluuid->uuid, conn->cookie);
1121         RETURN(0);
1122 }
1123 EXPORT_SYMBOL(class_connect);
1124
1125 /* if export is involved in recovery then clean up related things */
1126 void class_export_recovery_cleanup(struct obd_export *exp)
1127 {
1128         struct obd_device *obd = exp->exp_obd;
1129
1130         cfs_spin_lock(&obd->obd_recovery_task_lock);
1131         if (exp->exp_delayed)
1132                 obd->obd_delayed_clients--;
1133         if (obd->obd_recovering && exp->exp_in_recovery) {
1134                 cfs_spin_lock(&exp->exp_lock);
1135                 exp->exp_in_recovery = 0;
1136                 cfs_spin_unlock(&exp->exp_lock);
1137                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1138                 cfs_atomic_dec(&obd->obd_connected_clients);
1139         }
1140         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1141         /** Cleanup req replay fields */
1142         if (exp->exp_req_replay_needed) {
1143                 cfs_spin_lock(&exp->exp_lock);
1144                 exp->exp_req_replay_needed = 0;
1145                 cfs_spin_unlock(&exp->exp_lock);
1146                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1147                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1148         }
1149         /** Cleanup lock replay data */
1150         if (exp->exp_lock_replay_needed) {
1151                 cfs_spin_lock(&exp->exp_lock);
1152                 exp->exp_lock_replay_needed = 0;
1153                 cfs_spin_unlock(&exp->exp_lock);
1154                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1155                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1156         }
1157 }
1158
1159 /* This function removes 1-3 references from the export:
1160  * 1 - for export pointer passed
1161  * and if disconnect really need
1162  * 2 - removing from hash
1163  * 3 - in client_unlink_export
1164  * The export pointer passed to this function can destroyed */
1165 int class_disconnect(struct obd_export *export)
1166 {
1167         int already_disconnected;
1168         ENTRY;
1169
1170         if (export == NULL) {
1171                 CWARN("attempting to free NULL export %p\n", export);
1172                 RETURN(-EINVAL);
1173         }
1174
1175         cfs_spin_lock(&export->exp_lock);
1176         already_disconnected = export->exp_disconnected;
1177         export->exp_disconnected = 1;
1178         cfs_spin_unlock(&export->exp_lock);
1179
1180         /* class_cleanup(), abort_recovery(), and class_fail_export()
1181          * all end up in here, and if any of them race we shouldn't
1182          * call extra class_export_puts(). */
1183         if (already_disconnected) {
1184                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1185                 GOTO(no_disconn, already_disconnected);
1186         }
1187
1188         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1189                export->exp_handle.h_cookie);
1190
1191         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1192                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1193                              &export->exp_connection->c_peer.nid,
1194                              &export->exp_nid_hash);
1195
1196         class_export_recovery_cleanup(export);
1197         class_unlink_export(export);
1198 no_disconn:
1199         class_export_put(export);
1200         RETURN(0);
1201 }
1202 EXPORT_SYMBOL(class_disconnect);
1203
1204 /* Return non-zero for a fully connected export */
1205 int class_connected_export(struct obd_export *exp)
1206 {
1207         if (exp) {
1208                 int connected;
1209                 cfs_spin_lock(&exp->exp_lock);
1210                 connected = (exp->exp_conn_cnt > 0);
1211                 cfs_spin_unlock(&exp->exp_lock);
1212                 return connected;
1213         }
1214         return 0;
1215 }
1216 EXPORT_SYMBOL(class_connected_export);
1217
1218 static void class_disconnect_export_list(cfs_list_t *list,
1219                                          enum obd_option flags)
1220 {
1221         int rc;
1222         struct obd_export *exp;
1223         ENTRY;
1224
1225         /* It's possible that an export may disconnect itself, but
1226          * nothing else will be added to this list. */
1227         while (!cfs_list_empty(list)) {
1228                 exp = cfs_list_entry(list->next, struct obd_export,
1229                                      exp_obd_chain);
1230                 /* need for safe call CDEBUG after obd_disconnect */
1231                 class_export_get(exp);
1232
1233                 cfs_spin_lock(&exp->exp_lock);
1234                 exp->exp_flags = flags;
1235                 cfs_spin_unlock(&exp->exp_lock);
1236
1237                 if (obd_uuid_equals(&exp->exp_client_uuid,
1238                                     &exp->exp_obd->obd_uuid)) {
1239                         CDEBUG(D_HA,
1240                                "exp %p export uuid == obd uuid, don't discon\n",
1241                                exp);
1242                         /* Need to delete this now so we don't end up pointing
1243                          * to work_list later when this export is cleaned up. */
1244                         cfs_list_del_init(&exp->exp_obd_chain);
1245                         class_export_put(exp);
1246                         continue;
1247                 }
1248
1249                 class_export_get(exp);
1250                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1251                        "last request at "CFS_TIME_T"\n",
1252                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1253                        exp, exp->exp_last_request_time);
1254                 /* release one export reference anyway */
1255                 rc = obd_disconnect(exp);
1256
1257                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1258                        obd_export_nid2str(exp), exp, rc);
1259                 class_export_put(exp);
1260         }
1261         EXIT;
1262 }
1263
1264 void class_disconnect_exports(struct obd_device *obd)
1265 {
1266         cfs_list_t work_list;
1267         ENTRY;
1268
1269         /* Move all of the exports from obd_exports to a work list, en masse. */
1270         CFS_INIT_LIST_HEAD(&work_list);
1271         cfs_spin_lock(&obd->obd_dev_lock);
1272         cfs_list_splice_init(&obd->obd_exports, &work_list);
1273         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1274         cfs_spin_unlock(&obd->obd_dev_lock);
1275
1276         if (!cfs_list_empty(&work_list)) {
1277                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278                        "disconnecting them\n", obd->obd_minor, obd);
1279                 class_disconnect_export_list(&work_list,
1280                                              exp_flags_from_obd(obd));
1281         } else
1282                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283                        obd->obd_minor, obd);
1284         EXIT;
1285 }
1286 EXPORT_SYMBOL(class_disconnect_exports);
1287
1288 /* Remove exports that have not completed recovery.
1289  */
1290 void class_disconnect_stale_exports(struct obd_device *obd,
1291                                     int (*test_export)(struct obd_export *))
1292 {
1293         cfs_list_t work_list;
1294         struct obd_export *exp, *n;
1295         int evicted = 0;
1296         ENTRY;
1297
1298         CFS_INIT_LIST_HEAD(&work_list);
1299         cfs_spin_lock(&obd->obd_dev_lock);
1300         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1301                                      exp_obd_chain) {
1302                 /* don't count self-export as client */
1303                 if (obd_uuid_equals(&exp->exp_client_uuid,
1304                                     &exp->exp_obd->obd_uuid))
1305                         continue;
1306
1307                 /* don't evict clients which have no slot in last_rcvd
1308                  * (e.g. lightweight connection) */
1309                 if (exp->exp_target_data.ted_lr_idx == -1)
1310                         continue;
1311
1312                 cfs_spin_lock(&exp->exp_lock);
1313                 if (test_export(exp)) {
1314                         cfs_spin_unlock(&exp->exp_lock);
1315                         continue;
1316                 }
1317                 exp->exp_failed = 1;
1318                 cfs_spin_unlock(&exp->exp_lock);
1319
1320                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1321                 evicted++;
1322                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1323                        obd->obd_name, exp->exp_client_uuid.uuid,
1324                        exp->exp_connection == NULL ? "<unknown>" :
1325                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1326                 print_export_data(exp, "EVICTING", 0);
1327         }
1328         cfs_spin_unlock(&obd->obd_dev_lock);
1329
1330         if (evicted) {
1331                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332                               obd->obd_name, evicted);
1333                 obd->obd_stale_clients += evicted;
1334         }
1335         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1336                                                  OBD_OPT_ABORT_RECOV);
1337         EXIT;
1338 }
1339 EXPORT_SYMBOL(class_disconnect_stale_exports);
1340
1341 void class_fail_export(struct obd_export *exp)
1342 {
1343         int rc, already_failed;
1344
1345         cfs_spin_lock(&exp->exp_lock);
1346         already_failed = exp->exp_failed;
1347         exp->exp_failed = 1;
1348         cfs_spin_unlock(&exp->exp_lock);
1349
1350         if (already_failed) {
1351                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1352                        exp, exp->exp_client_uuid.uuid);
1353                 return;
1354         }
1355
1356         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1357                exp, exp->exp_client_uuid.uuid);
1358
1359         if (obd_dump_on_timeout)
1360                 libcfs_debug_dumplog();
1361
1362         /* need for safe call CDEBUG after obd_disconnect */
1363         class_export_get(exp);
1364
1365         /* Most callers into obd_disconnect are removing their own reference
1366          * (request, for example) in addition to the one from the hash table.
1367          * We don't have such a reference here, so make one. */
1368         class_export_get(exp);
1369         rc = obd_disconnect(exp);
1370         if (rc)
1371                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1372         else
1373                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1374                        exp, exp->exp_client_uuid.uuid);
1375         class_export_put(exp);
1376 }
1377 EXPORT_SYMBOL(class_fail_export);
1378
1379 char *obd_export_nid2str(struct obd_export *exp)
1380 {
1381         if (exp->exp_connection != NULL)
1382                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1383
1384         return "(no nid)";
1385 }
1386 EXPORT_SYMBOL(obd_export_nid2str);
1387
1388 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1389 {
1390         struct obd_export *doomed_exp = NULL;
1391         int exports_evicted = 0;
1392
1393         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1394
1395         do {
1396                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1397                 if (doomed_exp == NULL)
1398                         break;
1399
1400                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1401                          "nid %s found, wanted nid %s, requested nid %s\n",
1402                          obd_export_nid2str(doomed_exp),
1403                          libcfs_nid2str(nid_key), nid);
1404                 LASSERTF(doomed_exp != obd->obd_self_export,
1405                          "self-export is hashed by NID?\n");
1406                 exports_evicted++;
1407                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1408                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1409                        exports_evicted);
1410                 class_fail_export(doomed_exp);
1411                 class_export_put(doomed_exp);
1412         } while (1);
1413
1414         if (!exports_evicted)
1415                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1416                        obd->obd_name, nid);
1417         return exports_evicted;
1418 }
1419 EXPORT_SYMBOL(obd_export_evict_by_nid);
1420
1421 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1422 {
1423         struct obd_export *doomed_exp = NULL;
1424         struct obd_uuid doomed_uuid;
1425         int exports_evicted = 0;
1426
1427         obd_str2uuid(&doomed_uuid, uuid);
1428         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1429                 CERROR("%s: can't evict myself\n", obd->obd_name);
1430                 return exports_evicted;
1431         }
1432
1433         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1434
1435         if (doomed_exp == NULL) {
1436                 CERROR("%s: can't disconnect %s: no exports found\n",
1437                        obd->obd_name, uuid);
1438         } else {
1439                 CWARN("%s: evicting %s at adminstrative request\n",
1440                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1441                 class_fail_export(doomed_exp);
1442                 class_export_put(doomed_exp);
1443                 exports_evicted++;
1444         }
1445
1446         return exports_evicted;
1447 }
1448 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1449
1450 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1451 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1452 EXPORT_SYMBOL(class_export_dump_hook);
1453 #endif
1454
1455 static void print_export_data(struct obd_export *exp, const char *status,
1456                               int locks)
1457 {
1458         struct ptlrpc_reply_state *rs;
1459         struct ptlrpc_reply_state *first_reply = NULL;
1460         int nreplies = 0;
1461
1462         cfs_spin_lock(&exp->exp_lock);
1463         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1464                                 rs_exp_list) {
1465                 if (nreplies == 0)
1466                         first_reply = rs;
1467                 nreplies++;
1468         }
1469         cfs_spin_unlock(&exp->exp_lock);
1470
1471         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1472                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1473                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1474                cfs_atomic_read(&exp->exp_rpc_count),
1475                cfs_atomic_read(&exp->exp_cb_count),
1476                cfs_atomic_read(&exp->exp_locks_count),
1477                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1478                nreplies, first_reply, nreplies > 3 ? "..." : "",
1479                exp->exp_last_committed);
1480 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1481         if (locks && class_export_dump_hook != NULL)
1482                 class_export_dump_hook(exp);
1483 #endif
1484 }
1485
1486 void dump_exports(struct obd_device *obd, int locks)
1487 {
1488         struct obd_export *exp;
1489
1490         cfs_spin_lock(&obd->obd_dev_lock);
1491         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1492                 print_export_data(exp, "ACTIVE", locks);
1493         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1494                 print_export_data(exp, "UNLINKED", locks);
1495         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1496                 print_export_data(exp, "DELAYED", locks);
1497         cfs_spin_unlock(&obd->obd_dev_lock);
1498         cfs_spin_lock(&obd_zombie_impexp_lock);
1499         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1500                 print_export_data(exp, "ZOMBIE", locks);
1501         cfs_spin_unlock(&obd_zombie_impexp_lock);
1502 }
1503 EXPORT_SYMBOL(dump_exports);
1504
1505 void obd_exports_barrier(struct obd_device *obd)
1506 {
1507         int waited = 2;
1508         LASSERT(cfs_list_empty(&obd->obd_exports));
1509         cfs_spin_lock(&obd->obd_dev_lock);
1510         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1511                 cfs_spin_unlock(&obd->obd_dev_lock);
1512                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1513                                                    cfs_time_seconds(waited));
1514                 if (waited > 5 && IS_PO2(waited)) {
1515                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1516                                       "more than %d seconds. "
1517                                       "The obd refcount = %d. Is it stuck?\n",
1518                                       obd->obd_name, waited,
1519                                       cfs_atomic_read(&obd->obd_refcount));
1520                         dump_exports(obd, 1);
1521                 }
1522                 waited *= 2;
1523                 cfs_spin_lock(&obd->obd_dev_lock);
1524         }
1525         cfs_spin_unlock(&obd->obd_dev_lock);
1526 }
1527 EXPORT_SYMBOL(obd_exports_barrier);
1528
1529 /* Total amount of zombies to be destroyed */
1530 static int zombies_count = 0;
1531
1532 /**
1533  * kill zombie imports and exports
1534  */
1535 void obd_zombie_impexp_cull(void)
1536 {
1537         struct obd_import *import;
1538         struct obd_export *export;
1539         ENTRY;
1540
1541         do {
1542                 cfs_spin_lock(&obd_zombie_impexp_lock);
1543
1544                 import = NULL;
1545                 if (!cfs_list_empty(&obd_zombie_imports)) {
1546                         import = cfs_list_entry(obd_zombie_imports.next,
1547                                                 struct obd_import,
1548                                                 imp_zombie_chain);
1549                         cfs_list_del_init(&import->imp_zombie_chain);
1550                 }
1551
1552                 export = NULL;
1553                 if (!cfs_list_empty(&obd_zombie_exports)) {
1554                         export = cfs_list_entry(obd_zombie_exports.next,
1555                                                 struct obd_export,
1556                                                 exp_obd_chain);
1557                         cfs_list_del_init(&export->exp_obd_chain);
1558                 }
1559
1560                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1561
1562                 if (import != NULL) {
1563                         class_import_destroy(import);
1564                         cfs_spin_lock(&obd_zombie_impexp_lock);
1565                         zombies_count--;
1566                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1567                 }
1568
1569                 if (export != NULL) {
1570                         class_export_destroy(export);
1571                         cfs_spin_lock(&obd_zombie_impexp_lock);
1572                         zombies_count--;
1573                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1574                 }
1575
1576                 cfs_cond_resched();
1577         } while (import != NULL || export != NULL);
1578         EXIT;
1579 }
1580
1581 static cfs_completion_t         obd_zombie_start;
1582 static cfs_completion_t         obd_zombie_stop;
1583 static unsigned long            obd_zombie_flags;
1584 static cfs_waitq_t              obd_zombie_waitq;
1585 static pid_t                    obd_zombie_pid;
1586
1587 enum {
1588         OBD_ZOMBIE_STOP   = 1 << 1
1589 };
1590
1591 /**
1592  * check for work for kill zombie import/export thread.
1593  */
1594 static int obd_zombie_impexp_check(void *arg)
1595 {
1596         int rc;
1597
1598         cfs_spin_lock(&obd_zombie_impexp_lock);
1599         rc = (zombies_count == 0) &&
1600              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1601         cfs_spin_unlock(&obd_zombie_impexp_lock);
1602
1603         RETURN(rc);
1604 }
1605
1606 /**
1607  * Add export to the obd_zombe thread and notify it.
1608  */
1609 static void obd_zombie_export_add(struct obd_export *exp) {
1610         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1611         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1612         cfs_list_del_init(&exp->exp_obd_chain);
1613         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1614         cfs_spin_lock(&obd_zombie_impexp_lock);
1615         zombies_count++;
1616         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1617         cfs_spin_unlock(&obd_zombie_impexp_lock);
1618
1619         obd_zombie_impexp_notify();
1620 }
1621
1622 /**
1623  * Add import to the obd_zombe thread and notify it.
1624  */
1625 static void obd_zombie_import_add(struct obd_import *imp) {
1626         LASSERT(imp->imp_sec == NULL);
1627         LASSERT(imp->imp_rq_pool == NULL);
1628         cfs_spin_lock(&obd_zombie_impexp_lock);
1629         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1630         zombies_count++;
1631         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1632         cfs_spin_unlock(&obd_zombie_impexp_lock);
1633
1634         obd_zombie_impexp_notify();
1635 }
1636
1637 /**
1638  * notify import/export destroy thread about new zombie.
1639  */
1640 static void obd_zombie_impexp_notify(void)
1641 {
1642         /*
1643          * Make sure obd_zomebie_impexp_thread get this notification.
1644          * It is possible this signal only get by obd_zombie_barrier, and
1645          * barrier gulps this notification and sleeps away and hangs ensues
1646          */
1647         cfs_waitq_broadcast(&obd_zombie_waitq);
1648 }
1649
1650 /**
1651  * check whether obd_zombie is idle
1652  */
1653 static int obd_zombie_is_idle(void)
1654 {
1655         int rc;
1656
1657         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1658         cfs_spin_lock(&obd_zombie_impexp_lock);
1659         rc = (zombies_count == 0);
1660         cfs_spin_unlock(&obd_zombie_impexp_lock);
1661         return rc;
1662 }
1663
1664 /**
1665  * wait when obd_zombie import/export queues become empty
1666  */
1667 void obd_zombie_barrier(void)
1668 {
1669         struct l_wait_info lwi = { 0 };
1670
1671         if (obd_zombie_pid == cfs_curproc_pid())
1672                 /* don't wait for myself */
1673                 return;
1674         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1675 }
1676 EXPORT_SYMBOL(obd_zombie_barrier);
1677
1678 #ifdef __KERNEL__
1679
1680 /**
1681  * destroy zombie export/import thread.
1682  */
1683 static int obd_zombie_impexp_thread(void *unused)
1684 {
1685         int rc;
1686
1687         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1688                 cfs_complete(&obd_zombie_start);
1689                 RETURN(rc);
1690         }
1691
1692         cfs_complete(&obd_zombie_start);
1693
1694         obd_zombie_pid = cfs_curproc_pid();
1695
1696         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1697                 struct l_wait_info lwi = { 0 };
1698
1699                 l_wait_event(obd_zombie_waitq,
1700                              !obd_zombie_impexp_check(NULL), &lwi);
1701                 obd_zombie_impexp_cull();
1702
1703                 /*
1704                  * Notify obd_zombie_barrier callers that queues
1705                  * may be empty.
1706                  */
1707                 cfs_waitq_signal(&obd_zombie_waitq);
1708         }
1709
1710         cfs_complete(&obd_zombie_stop);
1711
1712         RETURN(0);
1713 }
1714
1715 #else /* ! KERNEL */
1716
1717 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1718 static void *obd_zombie_impexp_work_cb;
1719 static void *obd_zombie_impexp_idle_cb;
1720
1721 int obd_zombie_impexp_kill(void *arg)
1722 {
1723         int rc = 0;
1724
1725         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1726                 obd_zombie_impexp_cull();
1727                 rc = 1;
1728         }
1729         cfs_atomic_dec(&zombie_recur);
1730         return rc;
1731 }
1732
1733 #endif
1734
1735 /**
1736  * start destroy zombie import/export thread
1737  */
1738 int obd_zombie_impexp_init(void)
1739 {
1740         int rc;
1741
1742         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1743         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1744         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1745         cfs_init_completion(&obd_zombie_start);
1746         cfs_init_completion(&obd_zombie_stop);
1747         cfs_waitq_init(&obd_zombie_waitq);
1748         obd_zombie_pid = 0;
1749
1750 #ifdef __KERNEL__
1751         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1752         if (rc < 0)
1753                 RETURN(rc);
1754
1755         cfs_wait_for_completion(&obd_zombie_start);
1756 #else
1757
1758         obd_zombie_impexp_work_cb =
1759                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1760                                                  &obd_zombie_impexp_kill, NULL);
1761
1762         obd_zombie_impexp_idle_cb =
1763                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1764                                                  &obd_zombie_impexp_check, NULL);
1765         rc = 0;
1766 #endif
1767         RETURN(rc);
1768 }
1769 /**
1770  * stop destroy zombie import/export thread
1771  */
1772 void obd_zombie_impexp_stop(void)
1773 {
1774         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1775         obd_zombie_impexp_notify();
1776 #ifdef __KERNEL__
1777         cfs_wait_for_completion(&obd_zombie_stop);
1778 #else
1779         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1780         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1781 #endif
1782 }
1783
1784 /***** Kernel-userspace comm helpers *******/
1785
1786 /* Get length of entire message, including header */
1787 int kuc_len(int payload_len)
1788 {
1789         return sizeof(struct kuc_hdr) + payload_len;
1790 }
1791 EXPORT_SYMBOL(kuc_len);
1792
1793 /* Get a pointer to kuc header, given a ptr to the payload
1794  * @param p Pointer to payload area
1795  * @returns Pointer to kuc header
1796  */
1797 struct kuc_hdr * kuc_ptr(void *p)
1798 {
1799         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1800         LASSERT(lh->kuc_magic == KUC_MAGIC);
1801         return lh;
1802 }
1803 EXPORT_SYMBOL(kuc_ptr);
1804
1805 /* Test if payload is part of kuc message
1806  * @param p Pointer to payload area
1807  * @returns boolean
1808  */
1809 int kuc_ispayload(void *p)
1810 {
1811         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1812
1813         if (kh->kuc_magic == KUC_MAGIC)
1814                 return 1;
1815         else
1816                 return 0;
1817 }
1818 EXPORT_SYMBOL(kuc_ispayload);
1819
1820 /* Alloc space for a message, and fill in header
1821  * @return Pointer to payload area
1822  */
1823 void *kuc_alloc(int payload_len, int transport, int type)
1824 {
1825         struct kuc_hdr *lh;
1826         int len = kuc_len(payload_len);
1827
1828         OBD_ALLOC(lh, len);
1829         if (lh == NULL)
1830                 return ERR_PTR(-ENOMEM);
1831
1832         lh->kuc_magic = KUC_MAGIC;
1833         lh->kuc_transport = transport;
1834         lh->kuc_msgtype = type;
1835         lh->kuc_msglen = len;
1836
1837         return (void *)(lh + 1);
1838 }
1839 EXPORT_SYMBOL(kuc_alloc);
1840
1841 /* Takes pointer to payload area */
1842 inline void kuc_free(void *p, int payload_len)
1843 {
1844         struct kuc_hdr *lh = kuc_ptr(p);
1845         OBD_FREE(lh, kuc_len(payload_len));
1846 }
1847 EXPORT_SYMBOL(kuc_free);
1848
1849
1850