Whamcloud - gitweb
LU-735 Remove fixme() macro
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142 EXPORT_SYMBOL(class_get_type);
143
144 void class_put_type(struct obd_type *type)
145 {
146         LASSERT(type);
147         cfs_spin_lock(&type->obd_type_lock);
148         type->typ_refcnt--;
149         cfs_module_put(type->typ_dt_ops->o_owner);
150         cfs_spin_unlock(&type->obd_type_lock);
151 }
152 EXPORT_SYMBOL(class_put_type);
153
154 #define CLASS_MAX_NAME 1024
155
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157                         struct lprocfs_vars *vars, const char *name,
158                         struct lu_device_type *ldt)
159 {
160         struct obd_type *type;
161         int rc = 0;
162         ENTRY;
163
164         /* sanity check */
165         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167         if (class_search_type(name)) {
168                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169                 RETURN(-EEXIST);
170         }
171
172         rc = -ENOMEM;
173         OBD_ALLOC(type, sizeof(*type));
174         if (type == NULL)
175                 RETURN(rc);
176
177         OBD_ALLOC_PTR(type->typ_dt_ops);
178         OBD_ALLOC_PTR(type->typ_md_ops);
179         OBD_ALLOC(type->typ_name, strlen(name) + 1);
180
181         if (type->typ_dt_ops == NULL ||
182             type->typ_md_ops == NULL ||
183             type->typ_name == NULL)
184                 GOTO (failed, rc);
185
186         *(type->typ_dt_ops) = *dt_ops;
187         /* md_ops is optional */
188         if (md_ops)
189                 *(type->typ_md_ops) = *md_ops;
190         strcpy(type->typ_name, name);
191         cfs_spin_lock_init(&type->obd_type_lock);
192
193 #ifdef LPROCFS
194         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195                                               vars, type);
196         if (IS_ERR(type->typ_procroot)) {
197                 rc = PTR_ERR(type->typ_procroot);
198                 type->typ_procroot = NULL;
199                 GOTO (failed, rc);
200         }
201 #endif
202         if (ldt != NULL) {
203                 type->typ_lu = ldt;
204                 rc = lu_device_type_init(ldt);
205                 if (rc != 0)
206                         GOTO (failed, rc);
207         }
208
209         cfs_spin_lock(&obd_types_lock);
210         cfs_list_add(&type->typ_chain, &obd_types);
211         cfs_spin_unlock(&obd_types_lock);
212
213         RETURN (0);
214
215  failed:
216         if (type->typ_name != NULL)
217                 OBD_FREE(type->typ_name, strlen(name) + 1);
218         if (type->typ_md_ops != NULL)
219                 OBD_FREE_PTR(type->typ_md_ops);
220         if (type->typ_dt_ops != NULL)
221                 OBD_FREE_PTR(type->typ_dt_ops);
222         OBD_FREE(type, sizeof(*type));
223         RETURN(rc);
224 }
225 EXPORT_SYMBOL(class_register_type);
226
227 int class_unregister_type(const char *name)
228 {
229         struct obd_type *type = class_search_type(name);
230         ENTRY;
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 RETURN(-EINVAL);
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 RETURN(-EBUSY);
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         cfs_spin_lock(&obd_types_lock);
254         cfs_list_del(&type->typ_chain);
255         cfs_spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         RETURN(0);
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *         pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists, won't add\n", name);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         cfs_write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 result = ERR_PTR(-EOVERFLOW);
340         }
341
342         if (IS_ERR(result)) {
343                 obd_device_free(newdev);
344                 class_put_type(type);
345         } else {
346                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                        result->obd_name, result);
348         }
349         return result;
350 }
351
352 void class_release_dev(struct obd_device *obd)
353 {
354         struct obd_type *obd_type = obd->obd_type;
355
356         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360         LASSERT(obd_type != NULL);
361
362         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
363                obd->obd_name,obd->obd_type->typ_name);
364
365         cfs_write_lock(&obd_dev_lock);
366         obd_devs[obd->obd_minor] = NULL;
367         cfs_write_unlock(&obd_dev_lock);
368         obd_device_free(obd);
369
370         class_put_type(obd_type);
371 }
372
373 int class_name2dev(const char *name)
374 {
375         int i;
376
377         if (!name)
378                 return -1;
379
380         cfs_read_lock(&obd_dev_lock);
381         for (i = 0; i < class_devno_max(); i++) {
382                 struct obd_device *obd = class_num2obd(i);
383
384                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385                         /* Make sure we finished attaching before we give
386                            out any references */
387                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388                         if (obd->obd_attached) {
389                                 cfs_read_unlock(&obd_dev_lock);
390                                 return i;
391                         }
392                         break;
393                 }
394         }
395         cfs_read_unlock(&obd_dev_lock);
396
397         return -1;
398 }
399 EXPORT_SYMBOL(class_name2dev);
400
401 struct obd_device *class_name2obd(const char *name)
402 {
403         int dev = class_name2dev(name);
404
405         if (dev < 0 || dev > class_devno_max())
406                 return NULL;
407         return class_num2obd(dev);
408 }
409 EXPORT_SYMBOL(class_name2obd);
410
411 int class_uuid2dev(struct obd_uuid *uuid)
412 {
413         int i;
414
415         cfs_read_lock(&obd_dev_lock);
416         for (i = 0; i < class_devno_max(); i++) {
417                 struct obd_device *obd = class_num2obd(i);
418
419                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421                         cfs_read_unlock(&obd_dev_lock);
422                         return i;
423                 }
424         }
425         cfs_read_unlock(&obd_dev_lock);
426
427         return -1;
428 }
429 EXPORT_SYMBOL(class_uuid2dev);
430
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 {
433         int dev = class_uuid2dev(uuid);
434         if (dev < 0)
435                 return NULL;
436         return class_num2obd(dev);
437 }
438 EXPORT_SYMBOL(class_uuid2obd);
439
440 /**
441  * Get obd device from ::obd_devs[]
442  *
443  * \param num [in] array index
444  *
445  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446  *         otherwise return the obd device there.
447  */
448 struct obd_device *class_num2obd(int num)
449 {
450         struct obd_device *obd = NULL;
451
452         if (num < class_devno_max()) {
453                 obd = obd_devs[num];
454                 if (obd == NULL)
455                         return NULL;
456
457                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458                          "%p obd_magic %08x != %08x\n",
459                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460                 LASSERTF(obd->obd_minor == num,
461                          "%p obd_minor %0d != %0d\n",
462                          obd, obd->obd_minor, num);
463         }
464
465         return obd;
466 }
467 EXPORT_SYMBOL(class_num2obd);
468
469 void class_obd_list(void)
470 {
471         char *status;
472         int i;
473
474         cfs_read_lock(&obd_dev_lock);
475         for (i = 0; i < class_devno_max(); i++) {
476                 struct obd_device *obd = class_num2obd(i);
477
478                 if (obd == NULL)
479                         continue;
480                 if (obd->obd_stopping)
481                         status = "ST";
482                 else if (obd->obd_set_up)
483                         status = "UP";
484                 else if (obd->obd_attached)
485                         status = "AT";
486                 else
487                         status = "--";
488                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489                          i, status, obd->obd_type->typ_name,
490                          obd->obd_name, obd->obd_uuid.uuid,
491                          cfs_atomic_read(&obd->obd_refcount));
492         }
493         cfs_read_unlock(&obd_dev_lock);
494         return;
495 }
496
497 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
498    specified, then only the client with that uuid is returned,
499    otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501                                           const char * typ_name,
502                                           struct obd_uuid *grp_uuid)
503 {
504         int i;
505
506         cfs_read_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd == NULL)
511                         continue;
512                 if ((strncmp(obd->obd_type->typ_name, typ_name,
513                              strlen(typ_name)) == 0)) {
514                         if (obd_uuid_equals(tgt_uuid,
515                                             &obd->u.cli.cl_target_uuid) &&
516                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
517                                                          &obd->obd_uuid) : 1)) {
518                                 cfs_read_unlock(&obd_dev_lock);
519                                 return obd;
520                         }
521                 }
522         }
523         cfs_read_unlock(&obd_dev_lock);
524
525         return NULL;
526 }
527 EXPORT_SYMBOL(class_find_client_obd);
528
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530    searching at *next, and if a device is found, the next index to look
531    at is saved in *next. If next is NULL, then the first matching device
532    will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
534 {
535         int i;
536
537         if (next == NULL)
538                 i = 0;
539         else if (*next >= 0 && *next < class_devno_max())
540                 i = *next;
541         else
542                 return NULL;
543
544         cfs_read_lock(&obd_dev_lock);
545         for (; i < class_devno_max(); i++) {
546                 struct obd_device *obd = class_num2obd(i);
547
548                 if (obd == NULL)
549                         continue;
550                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
551                         if (next != NULL)
552                                 *next = i+1;
553                         cfs_read_unlock(&obd_dev_lock);
554                         return obd;
555                 }
556         }
557         cfs_read_unlock(&obd_dev_lock);
558
559         return NULL;
560 }
561 EXPORT_SYMBOL(class_devices_in_group);
562
563 /**
564  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565  * adjust sptlrpc settings accordingly.
566  */
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 {
569         struct obd_device  *obd;
570         const char         *type;
571         int                 i, rc = 0, rc2;
572
573         LASSERT(namelen > 0);
574
575         cfs_read_lock(&obd_dev_lock);
576         for (i = 0; i < class_devno_max(); i++) {
577                 obd = class_num2obd(i);
578
579                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
580                         continue;
581
582                 /* only notify mdc, osc, mdt, ost */
583                 type = obd->obd_type->typ_name;
584                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OST_NAME) != 0)
588                         continue;
589
590                 if (strncmp(obd->obd_name, fsname, namelen))
591                         continue;
592
593                 class_incref(obd, __FUNCTION__, obd);
594                 cfs_read_unlock(&obd_dev_lock);
595                 rc2 = obd_set_info_async(obd->obd_self_export,
596                                          sizeof(KEY_SPTLRPC_CONF),
597                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
598                 rc = rc ? rc : rc2;
599                 class_decref(obd, __FUNCTION__, obd);
600                 cfs_read_lock(&obd_dev_lock);
601         }
602         cfs_read_unlock(&obd_dev_lock);
603         return rc;
604 }
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606
607 void obd_cleanup_caches(void)
608 {
609         int rc;
610
611         ENTRY;
612         if (obd_device_cachep) {
613                 rc = cfs_mem_cache_destroy(obd_device_cachep);
614                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615                 obd_device_cachep = NULL;
616         }
617         if (obdo_cachep) {
618                 rc = cfs_mem_cache_destroy(obdo_cachep);
619                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
620                 obdo_cachep = NULL;
621         }
622         if (import_cachep) {
623                 rc = cfs_mem_cache_destroy(import_cachep);
624                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625                 import_cachep = NULL;
626         }
627         if (capa_cachep) {
628                 rc = cfs_mem_cache_destroy(capa_cachep);
629                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
630                 capa_cachep = NULL;
631         }
632         EXIT;
633 }
634
635 int obd_init_caches(void)
636 {
637         ENTRY;
638
639         LASSERT(obd_device_cachep == NULL);
640         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641                                                  sizeof(struct obd_device),
642                                                  0, 0);
643         if (!obd_device_cachep)
644                 GOTO(out, -ENOMEM);
645
646         LASSERT(obdo_cachep == NULL);
647         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
648                                            0, 0);
649         if (!obdo_cachep)
650                 GOTO(out, -ENOMEM);
651
652         LASSERT(import_cachep == NULL);
653         import_cachep = cfs_mem_cache_create("ll_import_cache",
654                                              sizeof(struct obd_import),
655                                              0, 0);
656         if (!import_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(capa_cachep == NULL);
660         capa_cachep = cfs_mem_cache_create("capa_cache",
661                                            sizeof(struct obd_capa), 0, 0);
662         if (!capa_cachep)
663                 GOTO(out, -ENOMEM);
664
665         RETURN(0);
666  out:
667         obd_cleanup_caches();
668         RETURN(-ENOMEM);
669
670 }
671
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 {
675         struct obd_export *export;
676         ENTRY;
677
678         if (!conn) {
679                 CDEBUG(D_CACHE, "looking for null handle\n");
680                 RETURN(NULL);
681         }
682
683         if (conn->cookie == -1) {  /* this means assign a new connection */
684                 CDEBUG(D_CACHE, "want a new connection\n");
685                 RETURN(NULL);
686         }
687
688         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689         export = class_handle2object(conn->cookie);
690         RETURN(export);
691 }
692 EXPORT_SYMBOL(class_conn2export);
693
694 struct obd_device *class_exp2obd(struct obd_export *exp)
695 {
696         if (exp)
697                 return exp->exp_obd;
698         return NULL;
699 }
700 EXPORT_SYMBOL(class_exp2obd);
701
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 {
704         struct obd_export *export;
705         export = class_conn2export(conn);
706         if (export) {
707                 struct obd_device *obd = export->exp_obd;
708                 class_export_put(export);
709                 return obd;
710         }
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_conn2obd);
714
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         if (obd == NULL)
719                 return NULL;
720         return obd->u.cli.cl_import;
721 }
722 EXPORT_SYMBOL(class_exp2cliimp);
723
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 {
726         struct obd_device *obd = class_conn2obd(conn);
727         if (obd == NULL)
728                 return NULL;
729         return obd->u.cli.cl_import;
730 }
731 EXPORT_SYMBOL(class_conn2cliimp);
732
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
735 {
736         struct obd_device *obd = exp->exp_obd;
737         ENTRY;
738
739         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
740
741         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
742                exp->exp_client_uuid.uuid, obd->obd_name);
743
744         LASSERT(obd != NULL);
745
746         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747         if (exp->exp_connection)
748                 ptlrpc_put_connection_superhack(exp->exp_connection);
749
750         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
754         obd_destroy_export(exp);
755         class_decref(obd, "export", exp);
756
757         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
758         EXIT;
759 }
760
761 static void export_handle_addref(void *export)
762 {
763         class_export_get(export);
764 }
765
766 struct obd_export *class_export_get(struct obd_export *exp)
767 {
768         cfs_atomic_inc(&exp->exp_refcount);
769         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
770                cfs_atomic_read(&exp->exp_refcount));
771         return exp;
772 }
773 EXPORT_SYMBOL(class_export_get);
774
775 void class_export_put(struct obd_export *exp)
776 {
777         LASSERT(exp != NULL);
778         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
779         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
780                cfs_atomic_read(&exp->exp_refcount) - 1);
781
782         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
783                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
784                 CDEBUG(D_IOCTL, "final put %p/%s\n",
785                        exp, exp->exp_client_uuid.uuid);
786
787                 /* release nid stat refererence */
788                 lprocfs_exp_cleanup(exp);
789
790                 obd_zombie_export_add(exp);
791         }
792 }
793 EXPORT_SYMBOL(class_export_put);
794
795 /* Creates a new export, adds it to the hash table, and returns a
796  * pointer to it. The refcount is 2: one for the hash reference, and
797  * one for the pointer returned by this function. */
798 struct obd_export *class_new_export(struct obd_device *obd,
799                                     struct obd_uuid *cluuid)
800 {
801         struct obd_export *export;
802         cfs_hash_t *hash = NULL;
803         int rc = 0;
804         ENTRY;
805
806         OBD_ALLOC_PTR(export);
807         if (!export)
808                 return ERR_PTR(-ENOMEM);
809
810         export->exp_conn_cnt = 0;
811         export->exp_lock_hash = NULL;
812         cfs_atomic_set(&export->exp_refcount, 2);
813         cfs_atomic_set(&export->exp_rpc_count, 0);
814         cfs_atomic_set(&export->exp_cb_count, 0);
815         cfs_atomic_set(&export->exp_locks_count, 0);
816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
817         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
818         cfs_spin_lock_init(&export->exp_locks_list_guard);
819 #endif
820         cfs_atomic_set(&export->exp_replay_count, 0);
821         export->exp_obd = obd;
822         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
823         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
824         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
825         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
826         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
827         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
828         class_handle_hash(&export->exp_handle, export_handle_addref);
829         export->exp_last_request_time = cfs_time_current_sec();
830         cfs_spin_lock_init(&export->exp_lock);
831         cfs_spin_lock_init(&export->exp_rpc_lock);
832         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
833         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
834
835         export->exp_sp_peer = LUSTRE_SP_ANY;
836         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
837         export->exp_client_uuid = *cluuid;
838         obd_init_export(export);
839
840         cfs_spin_lock(&obd->obd_dev_lock);
841          /* shouldn't happen, but might race */
842         if (obd->obd_stopping)
843                 GOTO(exit_unlock, rc = -ENODEV);
844
845         hash = cfs_hash_getref(obd->obd_uuid_hash);
846         if (hash == NULL)
847                 GOTO(exit_unlock, rc = -ENODEV);
848         cfs_spin_unlock(&obd->obd_dev_lock);
849
850         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
851                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
852                 if (rc != 0) {
853                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
854                                       obd->obd_name, cluuid->uuid, rc);
855                         GOTO(exit_err, rc = -EALREADY);
856                 }
857         }
858
859         cfs_spin_lock(&obd->obd_dev_lock);
860         if (obd->obd_stopping) {
861                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
862                 GOTO(exit_unlock, rc = -ENODEV);
863         }
864
865         class_incref(obd, "export", export);
866         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
867         cfs_list_add_tail(&export->exp_obd_chain_timed,
868                           &export->exp_obd->obd_exports_timed);
869         export->exp_obd->obd_num_exports++;
870         cfs_spin_unlock(&obd->obd_dev_lock);
871         cfs_hash_putref(hash);
872         RETURN(export);
873
874 exit_unlock:
875         cfs_spin_unlock(&obd->obd_dev_lock);
876 exit_err:
877         if (hash)
878                 cfs_hash_putref(hash);
879         class_handle_unhash(&export->exp_handle);
880         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
881         obd_destroy_export(export);
882         OBD_FREE_PTR(export);
883         return ERR_PTR(rc);
884 }
885 EXPORT_SYMBOL(class_new_export);
886
887 void class_unlink_export(struct obd_export *exp)
888 {
889         class_handle_unhash(&exp->exp_handle);
890
891         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
892         /* delete an uuid-export hashitem from hashtables */
893         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
894                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
895                              &exp->exp_client_uuid,
896                              &exp->exp_uuid_hash);
897
898         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
899         cfs_list_del_init(&exp->exp_obd_chain_timed);
900         exp->exp_obd->obd_num_exports--;
901         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
902         class_export_put(exp);
903 }
904 EXPORT_SYMBOL(class_unlink_export);
905
906 /* Import management functions */
907 void class_import_destroy(struct obd_import *imp)
908 {
909         ENTRY;
910
911         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
912                 imp->imp_obd->obd_name);
913
914         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
915
916         ptlrpc_put_connection_superhack(imp->imp_connection);
917
918         while (!cfs_list_empty(&imp->imp_conn_list)) {
919                 struct obd_import_conn *imp_conn;
920
921                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
922                                           struct obd_import_conn, oic_item);
923                 cfs_list_del_init(&imp_conn->oic_item);
924                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
925                 OBD_FREE(imp_conn, sizeof(*imp_conn));
926         }
927
928         LASSERT(imp->imp_sec == NULL);
929         class_decref(imp->imp_obd, "import", imp);
930         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
931         EXIT;
932 }
933
934 static void import_handle_addref(void *import)
935 {
936         class_import_get(import);
937 }
938
939 struct obd_import *class_import_get(struct obd_import *import)
940 {
941         cfs_atomic_inc(&import->imp_refcount);
942         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
943                cfs_atomic_read(&import->imp_refcount),
944                import->imp_obd->obd_name);
945         return import;
946 }
947 EXPORT_SYMBOL(class_import_get);
948
949 void class_import_put(struct obd_import *imp)
950 {
951         ENTRY;
952
953         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
954         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
955
956         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
957                cfs_atomic_read(&imp->imp_refcount) - 1,
958                imp->imp_obd->obd_name);
959
960         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
961                 CDEBUG(D_INFO, "final put import %p\n", imp);
962                 obd_zombie_import_add(imp);
963         }
964
965         EXIT;
966 }
967 EXPORT_SYMBOL(class_import_put);
968
969 static void init_imp_at(struct imp_at *at) {
970         int i;
971         at_init(&at->iat_net_latency, 0, 0);
972         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
973                 /* max service estimates are tracked on the server side, so
974                    don't use the AT history here, just use the last reported
975                    val. (But keep hist for proc histogram, worst_ever) */
976                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
977                         AT_FLG_NOHIST);
978         }
979 }
980
981 struct obd_import *class_new_import(struct obd_device *obd)
982 {
983         struct obd_import *imp;
984
985         OBD_ALLOC(imp, sizeof(*imp));
986         if (imp == NULL)
987                 return NULL;
988
989         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
990         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
991         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
992         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
993         cfs_spin_lock_init(&imp->imp_lock);
994         imp->imp_last_success_conn = 0;
995         imp->imp_state = LUSTRE_IMP_NEW;
996         imp->imp_obd = class_incref(obd, "import", imp);
997         cfs_sema_init(&imp->imp_sec_mutex, 1);
998         cfs_waitq_init(&imp->imp_recovery_waitq);
999
1000         cfs_atomic_set(&imp->imp_refcount, 2);
1001         cfs_atomic_set(&imp->imp_unregistering, 0);
1002         cfs_atomic_set(&imp->imp_inflight, 0);
1003         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1004         cfs_atomic_set(&imp->imp_inval_count, 0);
1005         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1006         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1007         class_handle_hash(&imp->imp_handle, import_handle_addref);
1008         init_imp_at(&imp->imp_at);
1009
1010         /* the default magic is V2, will be used in connect RPC, and
1011          * then adjusted according to the flags in request/reply. */
1012         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1013
1014         return imp;
1015 }
1016 EXPORT_SYMBOL(class_new_import);
1017
1018 void class_destroy_import(struct obd_import *import)
1019 {
1020         LASSERT(import != NULL);
1021         LASSERT(import != LP_POISON);
1022
1023         class_handle_unhash(&import->imp_handle);
1024
1025         cfs_spin_lock(&import->imp_lock);
1026         import->imp_generation++;
1027         cfs_spin_unlock(&import->imp_lock);
1028         class_import_put(import);
1029 }
1030 EXPORT_SYMBOL(class_destroy_import);
1031
1032 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1033
1034 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1035 {
1036         cfs_spin_lock(&exp->exp_locks_list_guard);
1037
1038         LASSERT(lock->l_exp_refs_nr >= 0);
1039
1040         if (lock->l_exp_refs_target != NULL &&
1041             lock->l_exp_refs_target != exp) {
1042                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1043                               exp, lock, lock->l_exp_refs_target);
1044         }
1045         if ((lock->l_exp_refs_nr ++) == 0) {
1046                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1047                 lock->l_exp_refs_target = exp;
1048         }
1049         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1050                lock, exp, lock->l_exp_refs_nr);
1051         cfs_spin_unlock(&exp->exp_locks_list_guard);
1052 }
1053 EXPORT_SYMBOL(__class_export_add_lock_ref);
1054
1055 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056 {
1057         cfs_spin_lock(&exp->exp_locks_list_guard);
1058         LASSERT(lock->l_exp_refs_nr > 0);
1059         if (lock->l_exp_refs_target != exp) {
1060                 LCONSOLE_WARN("lock %p, "
1061                               "mismatching export pointers: %p, %p\n",
1062                               lock, lock->l_exp_refs_target, exp);
1063         }
1064         if (-- lock->l_exp_refs_nr == 0) {
1065                 cfs_list_del_init(&lock->l_exp_refs_link);
1066                 lock->l_exp_refs_target = NULL;
1067         }
1068         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1069                lock, exp, lock->l_exp_refs_nr);
1070         cfs_spin_unlock(&exp->exp_locks_list_guard);
1071 }
1072 EXPORT_SYMBOL(__class_export_del_lock_ref);
1073 #endif
1074
1075 /* A connection defines an export context in which preallocation can
1076    be managed. This releases the export pointer reference, and returns
1077    the export handle, so the export refcount is 1 when this function
1078    returns. */
1079 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1080                   struct obd_uuid *cluuid)
1081 {
1082         struct obd_export *export;
1083         LASSERT(conn != NULL);
1084         LASSERT(obd != NULL);
1085         LASSERT(cluuid != NULL);
1086         ENTRY;
1087
1088         export = class_new_export(obd, cluuid);
1089         if (IS_ERR(export))
1090                 RETURN(PTR_ERR(export));
1091
1092         conn->cookie = export->exp_handle.h_cookie;
1093         class_export_put(export);
1094
1095         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1096                cluuid->uuid, conn->cookie);
1097         RETURN(0);
1098 }
1099 EXPORT_SYMBOL(class_connect);
1100
1101 /* if export is involved in recovery then clean up related things */
1102 void class_export_recovery_cleanup(struct obd_export *exp)
1103 {
1104         struct obd_device *obd = exp->exp_obd;
1105
1106         cfs_spin_lock(&obd->obd_recovery_task_lock);
1107         if (exp->exp_delayed)
1108                 obd->obd_delayed_clients--;
1109         if (obd->obd_recovering && exp->exp_in_recovery) {
1110                 cfs_spin_lock(&exp->exp_lock);
1111                 exp->exp_in_recovery = 0;
1112                 cfs_spin_unlock(&exp->exp_lock);
1113                 LASSERT(obd->obd_connected_clients);
1114                 obd->obd_connected_clients--;
1115         }
1116         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1117         /** Cleanup req replay fields */
1118         if (exp->exp_req_replay_needed) {
1119                 cfs_spin_lock(&exp->exp_lock);
1120                 exp->exp_req_replay_needed = 0;
1121                 cfs_spin_unlock(&exp->exp_lock);
1122                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1123                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1124         }
1125         /** Cleanup lock replay data */
1126         if (exp->exp_lock_replay_needed) {
1127                 cfs_spin_lock(&exp->exp_lock);
1128                 exp->exp_lock_replay_needed = 0;
1129                 cfs_spin_unlock(&exp->exp_lock);
1130                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1131                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1132         }
1133 }
1134
1135 /* This function removes 1-3 references from the export:
1136  * 1 - for export pointer passed
1137  * and if disconnect really need
1138  * 2 - removing from hash
1139  * 3 - in client_unlink_export
1140  * The export pointer passed to this function can destroyed */
1141 int class_disconnect(struct obd_export *export)
1142 {
1143         int already_disconnected;
1144         ENTRY;
1145
1146         if (export == NULL) {
1147                 CWARN("attempting to free NULL export %p\n", export);
1148                 RETURN(-EINVAL);
1149         }
1150
1151         cfs_spin_lock(&export->exp_lock);
1152         already_disconnected = export->exp_disconnected;
1153         export->exp_disconnected = 1;
1154         cfs_spin_unlock(&export->exp_lock);
1155
1156         /* class_cleanup(), abort_recovery(), and class_fail_export()
1157          * all end up in here, and if any of them race we shouldn't
1158          * call extra class_export_puts(). */
1159         if (already_disconnected) {
1160                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1161                 GOTO(no_disconn, already_disconnected);
1162         }
1163
1164         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1165                export->exp_handle.h_cookie);
1166
1167         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1168                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1169                              &export->exp_connection->c_peer.nid,
1170                              &export->exp_nid_hash);
1171
1172         class_export_recovery_cleanup(export);
1173         class_unlink_export(export);
1174 no_disconn:
1175         class_export_put(export);
1176         RETURN(0);
1177 }
1178 EXPORT_SYMBOL(class_disconnect);
1179
1180 /* Return non-zero for a fully connected export */
1181 int class_connected_export(struct obd_export *exp)
1182 {
1183         if (exp) {
1184                 int connected;
1185                 cfs_spin_lock(&exp->exp_lock);
1186                 connected = (exp->exp_conn_cnt > 0);
1187                 cfs_spin_unlock(&exp->exp_lock);
1188                 return connected;
1189         }
1190         return 0;
1191 }
1192 EXPORT_SYMBOL(class_connected_export);
1193
1194 static void class_disconnect_export_list(cfs_list_t *list,
1195                                          enum obd_option flags)
1196 {
1197         int rc;
1198         struct obd_export *exp;
1199         ENTRY;
1200
1201         /* It's possible that an export may disconnect itself, but
1202          * nothing else will be added to this list. */
1203         while (!cfs_list_empty(list)) {
1204                 exp = cfs_list_entry(list->next, struct obd_export,
1205                                      exp_obd_chain);
1206                 /* need for safe call CDEBUG after obd_disconnect */
1207                 class_export_get(exp);
1208
1209                 cfs_spin_lock(&exp->exp_lock);
1210                 exp->exp_flags = flags;
1211                 cfs_spin_unlock(&exp->exp_lock);
1212
1213                 if (obd_uuid_equals(&exp->exp_client_uuid,
1214                                     &exp->exp_obd->obd_uuid)) {
1215                         CDEBUG(D_HA,
1216                                "exp %p export uuid == obd uuid, don't discon\n",
1217                                exp);
1218                         /* Need to delete this now so we don't end up pointing
1219                          * to work_list later when this export is cleaned up. */
1220                         cfs_list_del_init(&exp->exp_obd_chain);
1221                         class_export_put(exp);
1222                         continue;
1223                 }
1224
1225                 class_export_get(exp);
1226                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1227                        "last request at "CFS_TIME_T"\n",
1228                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1229                        exp, exp->exp_last_request_time);
1230                 /* release one export reference anyway */
1231                 rc = obd_disconnect(exp);
1232
1233                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1234                        obd_export_nid2str(exp), exp, rc);
1235                 class_export_put(exp);
1236         }
1237         EXIT;
1238 }
1239
1240 void class_disconnect_exports(struct obd_device *obd)
1241 {
1242         cfs_list_t work_list;
1243         ENTRY;
1244
1245         /* Move all of the exports from obd_exports to a work list, en masse. */
1246         CFS_INIT_LIST_HEAD(&work_list);
1247         cfs_spin_lock(&obd->obd_dev_lock);
1248         cfs_list_splice_init(&obd->obd_exports, &work_list);
1249         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1250         cfs_spin_unlock(&obd->obd_dev_lock);
1251
1252         if (!cfs_list_empty(&work_list)) {
1253                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1254                        "disconnecting them\n", obd->obd_minor, obd);
1255                 class_disconnect_export_list(&work_list,
1256                                              exp_flags_from_obd(obd));
1257         } else
1258                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1259                        obd->obd_minor, obd);
1260         EXIT;
1261 }
1262 EXPORT_SYMBOL(class_disconnect_exports);
1263
1264 /* Remove exports that have not completed recovery.
1265  */
1266 void class_disconnect_stale_exports(struct obd_device *obd,
1267                                     int (*test_export)(struct obd_export *))
1268 {
1269         cfs_list_t work_list;
1270         cfs_list_t *pos, *n;
1271         struct obd_export *exp;
1272         int evicted = 0;
1273         ENTRY;
1274
1275         CFS_INIT_LIST_HEAD(&work_list);
1276         cfs_spin_lock(&obd->obd_dev_lock);
1277         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1278                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1279                 if (test_export(exp))
1280                         continue;
1281
1282                 /* don't count self-export as client */
1283                 if (obd_uuid_equals(&exp->exp_client_uuid,
1284                                     &exp->exp_obd->obd_uuid))
1285                         continue;
1286
1287                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1288                 evicted++;
1289                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1290                        obd->obd_name, exp->exp_client_uuid.uuid,
1291                        exp->exp_connection == NULL ? "<unknown>" :
1292                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1293                 print_export_data(exp, "EVICTING", 0);
1294         }
1295         cfs_spin_unlock(&obd->obd_dev_lock);
1296
1297         if (evicted) {
1298                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1299                        obd->obd_name, evicted);
1300                 obd->obd_stale_clients += evicted;
1301         }
1302         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1303                                                  OBD_OPT_ABORT_RECOV);
1304         EXIT;
1305 }
1306 EXPORT_SYMBOL(class_disconnect_stale_exports);
1307
1308 void class_fail_export(struct obd_export *exp)
1309 {
1310         int rc, already_failed;
1311
1312         cfs_spin_lock(&exp->exp_lock);
1313         already_failed = exp->exp_failed;
1314         exp->exp_failed = 1;
1315         cfs_spin_unlock(&exp->exp_lock);
1316
1317         if (already_failed) {
1318                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1319                        exp, exp->exp_client_uuid.uuid);
1320                 return;
1321         }
1322
1323         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1324                exp, exp->exp_client_uuid.uuid);
1325
1326         if (obd_dump_on_timeout)
1327                 libcfs_debug_dumplog();
1328
1329         /* Most callers into obd_disconnect are removing their own reference
1330          * (request, for example) in addition to the one from the hash table.
1331          * We don't have such a reference here, so make one. */
1332         class_export_get(exp);
1333         rc = obd_disconnect(exp);
1334         if (rc)
1335                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1336         else
1337                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1338                        exp, exp->exp_client_uuid.uuid);
1339 }
1340 EXPORT_SYMBOL(class_fail_export);
1341
1342 char *obd_export_nid2str(struct obd_export *exp)
1343 {
1344         if (exp->exp_connection != NULL)
1345                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1346
1347         return "(no nid)";
1348 }
1349 EXPORT_SYMBOL(obd_export_nid2str);
1350
1351 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1352 {
1353         struct obd_export *doomed_exp = NULL;
1354         int exports_evicted = 0;
1355
1356         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1357
1358         do {
1359                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1360                 if (doomed_exp == NULL)
1361                         break;
1362
1363                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1364                          "nid %s found, wanted nid %s, requested nid %s\n",
1365                          obd_export_nid2str(doomed_exp),
1366                          libcfs_nid2str(nid_key), nid);
1367                 LASSERTF(doomed_exp != obd->obd_self_export,
1368                          "self-export is hashed by NID?\n");
1369                 exports_evicted++;
1370                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1371                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1372                        exports_evicted);
1373                 class_fail_export(doomed_exp);
1374                 class_export_put(doomed_exp);
1375         } while (1);
1376
1377         if (!exports_evicted)
1378                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1379                        obd->obd_name, nid);
1380         return exports_evicted;
1381 }
1382 EXPORT_SYMBOL(obd_export_evict_by_nid);
1383
1384 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1385 {
1386         struct obd_export *doomed_exp = NULL;
1387         struct obd_uuid doomed_uuid;
1388         int exports_evicted = 0;
1389
1390         obd_str2uuid(&doomed_uuid, uuid);
1391         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1392                 CERROR("%s: can't evict myself\n", obd->obd_name);
1393                 return exports_evicted;
1394         }
1395
1396         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1397
1398         if (doomed_exp == NULL) {
1399                 CERROR("%s: can't disconnect %s: no exports found\n",
1400                        obd->obd_name, uuid);
1401         } else {
1402                 CWARN("%s: evicting %s at adminstrative request\n",
1403                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1404                 class_fail_export(doomed_exp);
1405                 class_export_put(doomed_exp);
1406                 exports_evicted++;
1407         }
1408
1409         return exports_evicted;
1410 }
1411 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1412
1413 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1414 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1415 EXPORT_SYMBOL(class_export_dump_hook);
1416 #endif
1417
1418 static void print_export_data(struct obd_export *exp, const char *status,
1419                               int locks)
1420 {
1421         struct ptlrpc_reply_state *rs;
1422         struct ptlrpc_reply_state *first_reply = NULL;
1423         int nreplies = 0;
1424
1425         cfs_spin_lock(&exp->exp_lock);
1426         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1427                                 rs_exp_list) {
1428                 if (nreplies == 0)
1429                         first_reply = rs;
1430                 nreplies++;
1431         }
1432         cfs_spin_unlock(&exp->exp_lock);
1433
1434         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1435                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1436                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1437                cfs_atomic_read(&exp->exp_rpc_count),
1438                cfs_atomic_read(&exp->exp_cb_count),
1439                cfs_atomic_read(&exp->exp_locks_count),
1440                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1441                nreplies, first_reply, nreplies > 3 ? "..." : "",
1442                exp->exp_last_committed);
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444         if (locks && class_export_dump_hook != NULL)
1445                 class_export_dump_hook(exp);
1446 #endif
1447 }
1448
1449 void dump_exports(struct obd_device *obd, int locks)
1450 {
1451         struct obd_export *exp;
1452
1453         cfs_spin_lock(&obd->obd_dev_lock);
1454         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1455                 print_export_data(exp, "ACTIVE", locks);
1456         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1457                 print_export_data(exp, "UNLINKED", locks);
1458         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1459                 print_export_data(exp, "DELAYED", locks);
1460         cfs_spin_unlock(&obd->obd_dev_lock);
1461         cfs_spin_lock(&obd_zombie_impexp_lock);
1462         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1463                 print_export_data(exp, "ZOMBIE", locks);
1464         cfs_spin_unlock(&obd_zombie_impexp_lock);
1465 }
1466 EXPORT_SYMBOL(dump_exports);
1467
1468 void obd_exports_barrier(struct obd_device *obd)
1469 {
1470         int waited = 2;
1471         LASSERT(cfs_list_empty(&obd->obd_exports));
1472         cfs_spin_lock(&obd->obd_dev_lock);
1473         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1474                 cfs_spin_unlock(&obd->obd_dev_lock);
1475                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1476                                                    cfs_time_seconds(waited));
1477                 if (waited > 5 && IS_PO2(waited)) {
1478                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1479                                       "more than %d seconds. "
1480                                       "The obd refcount = %d. Is it stuck?\n",
1481                                       obd->obd_name, waited,
1482                                       cfs_atomic_read(&obd->obd_refcount));
1483                         dump_exports(obd, 1);
1484                 }
1485                 waited *= 2;
1486                 cfs_spin_lock(&obd->obd_dev_lock);
1487         }
1488         cfs_spin_unlock(&obd->obd_dev_lock);
1489 }
1490 EXPORT_SYMBOL(obd_exports_barrier);
1491
1492 /* Total amount of zombies to be destroyed */
1493 static int zombies_count = 0;
1494
1495 /**
1496  * kill zombie imports and exports
1497  */
1498 void obd_zombie_impexp_cull(void)
1499 {
1500         struct obd_import *import;
1501         struct obd_export *export;
1502         ENTRY;
1503
1504         do {
1505                 cfs_spin_lock(&obd_zombie_impexp_lock);
1506
1507                 import = NULL;
1508                 if (!cfs_list_empty(&obd_zombie_imports)) {
1509                         import = cfs_list_entry(obd_zombie_imports.next,
1510                                                 struct obd_import,
1511                                                 imp_zombie_chain);
1512                         cfs_list_del_init(&import->imp_zombie_chain);
1513                 }
1514
1515                 export = NULL;
1516                 if (!cfs_list_empty(&obd_zombie_exports)) {
1517                         export = cfs_list_entry(obd_zombie_exports.next,
1518                                                 struct obd_export,
1519                                                 exp_obd_chain);
1520                         cfs_list_del_init(&export->exp_obd_chain);
1521                 }
1522
1523                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1524
1525                 if (import != NULL) {
1526                         class_import_destroy(import);
1527                         cfs_spin_lock(&obd_zombie_impexp_lock);
1528                         zombies_count--;
1529                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1530                 }
1531
1532                 if (export != NULL) {
1533                         class_export_destroy(export);
1534                         cfs_spin_lock(&obd_zombie_impexp_lock);
1535                         zombies_count--;
1536                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1537                 }
1538
1539                 cfs_cond_resched();
1540         } while (import != NULL || export != NULL);
1541         EXIT;
1542 }
1543
1544 static cfs_completion_t         obd_zombie_start;
1545 static cfs_completion_t         obd_zombie_stop;
1546 static unsigned long            obd_zombie_flags;
1547 static cfs_waitq_t              obd_zombie_waitq;
1548 static pid_t                    obd_zombie_pid;
1549
1550 enum {
1551         OBD_ZOMBIE_STOP   = 1 << 1
1552 };
1553
1554 /**
1555  * check for work for kill zombie import/export thread.
1556  */
1557 static int obd_zombie_impexp_check(void *arg)
1558 {
1559         int rc;
1560
1561         cfs_spin_lock(&obd_zombie_impexp_lock);
1562         rc = (zombies_count == 0) &&
1563              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1564         cfs_spin_unlock(&obd_zombie_impexp_lock);
1565
1566         RETURN(rc);
1567 }
1568
1569 /**
1570  * Add export to the obd_zombe thread and notify it.
1571  */
1572 static void obd_zombie_export_add(struct obd_export *exp) {
1573         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1574         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1575         cfs_list_del_init(&exp->exp_obd_chain);
1576         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1577         cfs_spin_lock(&obd_zombie_impexp_lock);
1578         zombies_count++;
1579         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1580         cfs_spin_unlock(&obd_zombie_impexp_lock);
1581
1582         if (obd_zombie_impexp_notify != NULL)
1583                 obd_zombie_impexp_notify();
1584 }
1585
1586 /**
1587  * Add import to the obd_zombe thread and notify it.
1588  */
1589 static void obd_zombie_import_add(struct obd_import *imp) {
1590         LASSERT(imp->imp_sec == NULL);
1591         cfs_spin_lock(&obd_zombie_impexp_lock);
1592         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1593         zombies_count++;
1594         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1595         cfs_spin_unlock(&obd_zombie_impexp_lock);
1596
1597         if (obd_zombie_impexp_notify != NULL)
1598                 obd_zombie_impexp_notify();
1599 }
1600
1601 /**
1602  * notify import/export destroy thread about new zombie.
1603  */
1604 static void obd_zombie_impexp_notify(void)
1605 {
1606         /*
1607          * Make sure obd_zomebie_impexp_thread get this notification.
1608          * It is possible this signal only get by obd_zombie_barrier, and
1609          * barrier gulps this notification and sleeps away and hangs ensues
1610          */
1611         cfs_waitq_broadcast(&obd_zombie_waitq);
1612 }
1613
1614 /**
1615  * check whether obd_zombie is idle
1616  */
1617 static int obd_zombie_is_idle(void)
1618 {
1619         int rc;
1620
1621         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1622         cfs_spin_lock(&obd_zombie_impexp_lock);
1623         rc = (zombies_count == 0);
1624         cfs_spin_unlock(&obd_zombie_impexp_lock);
1625         return rc;
1626 }
1627
1628 /**
1629  * wait when obd_zombie import/export queues become empty
1630  */
1631 void obd_zombie_barrier(void)
1632 {
1633         struct l_wait_info lwi = { 0 };
1634
1635         if (obd_zombie_pid == cfs_curproc_pid())
1636                 /* don't wait for myself */
1637                 return;
1638         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1639 }
1640 EXPORT_SYMBOL(obd_zombie_barrier);
1641
1642 #ifdef __KERNEL__
1643
1644 /**
1645  * destroy zombie export/import thread.
1646  */
1647 static int obd_zombie_impexp_thread(void *unused)
1648 {
1649         int rc;
1650
1651         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1652                 cfs_complete(&obd_zombie_start);
1653                 RETURN(rc);
1654         }
1655
1656         cfs_complete(&obd_zombie_start);
1657
1658         obd_zombie_pid = cfs_curproc_pid();
1659
1660         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1661                 struct l_wait_info lwi = { 0 };
1662
1663                 l_wait_event(obd_zombie_waitq,
1664                              !obd_zombie_impexp_check(NULL), &lwi);
1665                 obd_zombie_impexp_cull();
1666
1667                 /*
1668                  * Notify obd_zombie_barrier callers that queues
1669                  * may be empty.
1670                  */
1671                 cfs_waitq_signal(&obd_zombie_waitq);
1672         }
1673
1674         cfs_complete(&obd_zombie_stop);
1675
1676         RETURN(0);
1677 }
1678
1679 #else /* ! KERNEL */
1680
1681 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1682 static void *obd_zombie_impexp_work_cb;
1683 static void *obd_zombie_impexp_idle_cb;
1684
1685 int obd_zombie_impexp_kill(void *arg)
1686 {
1687         int rc = 0;
1688
1689         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1690                 obd_zombie_impexp_cull();
1691                 rc = 1;
1692         }
1693         cfs_atomic_dec(&zombie_recur);
1694         return rc;
1695 }
1696
1697 #endif
1698
1699 /**
1700  * start destroy zombie import/export thread
1701  */
1702 int obd_zombie_impexp_init(void)
1703 {
1704         int rc;
1705
1706         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1707         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1708         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1709         cfs_init_completion(&obd_zombie_start);
1710         cfs_init_completion(&obd_zombie_stop);
1711         cfs_waitq_init(&obd_zombie_waitq);
1712         obd_zombie_pid = 0;
1713
1714 #ifdef __KERNEL__
1715         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1716         if (rc < 0)
1717                 RETURN(rc);
1718
1719         cfs_wait_for_completion(&obd_zombie_start);
1720 #else
1721
1722         obd_zombie_impexp_work_cb =
1723                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1724                                                  &obd_zombie_impexp_kill, NULL);
1725
1726         obd_zombie_impexp_idle_cb =
1727                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1728                                                  &obd_zombie_impexp_check, NULL);
1729         rc = 0;
1730 #endif
1731         RETURN(rc);
1732 }
1733 /**
1734  * stop destroy zombie import/export thread
1735  */
1736 void obd_zombie_impexp_stop(void)
1737 {
1738         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1739         obd_zombie_impexp_notify();
1740 #ifdef __KERNEL__
1741         cfs_wait_for_completion(&obd_zombie_stop);
1742 #else
1743         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1744         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1745 #endif
1746 }
1747
1748 /***** Kernel-userspace comm helpers *******/
1749
1750 /* Get length of entire message, including header */
1751 int kuc_len(int payload_len)
1752 {
1753         return sizeof(struct kuc_hdr) + payload_len;
1754 }
1755 EXPORT_SYMBOL(kuc_len);
1756
1757 /* Get a pointer to kuc header, given a ptr to the payload
1758  * @param p Pointer to payload area
1759  * @returns Pointer to kuc header
1760  */
1761 struct kuc_hdr * kuc_ptr(void *p)
1762 {
1763         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1764         LASSERT(lh->kuc_magic == KUC_MAGIC);
1765         return lh;
1766 }
1767 EXPORT_SYMBOL(kuc_ptr);
1768
1769 /* Test if payload is part of kuc message
1770  * @param p Pointer to payload area
1771  * @returns boolean
1772  */
1773 int kuc_ispayload(void *p)
1774 {
1775         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1776
1777         if (kh->kuc_magic == KUC_MAGIC)
1778                 return 1;
1779         else
1780                 return 0;
1781 }
1782 EXPORT_SYMBOL(kuc_ispayload);
1783
1784 /* Alloc space for a message, and fill in header
1785  * @return Pointer to payload area
1786  */
1787 void *kuc_alloc(int payload_len, int transport, int type)
1788 {
1789         struct kuc_hdr *lh;
1790         int len = kuc_len(payload_len);
1791
1792         OBD_ALLOC(lh, len);
1793         if (lh == NULL)
1794                 return ERR_PTR(-ENOMEM);
1795
1796         lh->kuc_magic = KUC_MAGIC;
1797         lh->kuc_transport = transport;
1798         lh->kuc_msgtype = type;
1799         lh->kuc_msglen = len;
1800
1801         return (void *)(lh + 1);
1802 }
1803 EXPORT_SYMBOL(kuc_alloc);
1804
1805 /* Takes pointer to payload area */
1806 inline void kuc_free(void *p, int payload_len)
1807 {
1808         struct kuc_hdr *lh = kuc_ptr(p);
1809         OBD_FREE(lh, kuc_len(payload_len));
1810 }
1811 EXPORT_SYMBOL(kuc_free);
1812
1813
1814