Whamcloud - gitweb
853dd865ae51ee33be5b676808e432398ac6eaeb
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83 EXPORT_SYMBOL(obd_device_alloc);
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef CONFIG_KMOD
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149
150 #define CLASS_MAX_NAME 1024
151
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153                         struct lprocfs_vars *vars, const char *name,
154                         struct lu_device_type *ldt)
155 {
156         struct obd_type *type;
157         int rc = 0;
158         ENTRY;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 RETURN(-EEXIST);
166         }
167
168         rc = -ENOMEM;
169         OBD_ALLOC(type, sizeof(*type));
170         if (type == NULL)
171                 RETURN(rc);
172
173         OBD_ALLOC_PTR(type->typ_dt_ops);
174         OBD_ALLOC_PTR(type->typ_md_ops);
175         OBD_ALLOC(type->typ_name, strlen(name) + 1);
176
177         if (type->typ_dt_ops == NULL ||
178             type->typ_md_ops == NULL ||
179             type->typ_name == NULL)
180                 GOTO (failed, rc);
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         cfs_spin_lock_init(&type->obd_type_lock);
188
189 #ifdef LPROCFS
190         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191                                               vars, type);
192         if (IS_ERR(type->typ_procroot)) {
193                 rc = PTR_ERR(type->typ_procroot);
194                 type->typ_procroot = NULL;
195                 GOTO (failed, rc);
196         }
197 #endif
198         if (ldt != NULL) {
199                 type->typ_lu = ldt;
200                 rc = lu_device_type_init(ldt);
201                 if (rc != 0)
202                         GOTO (failed, rc);
203         }
204
205         cfs_spin_lock(&obd_types_lock);
206         cfs_list_add(&type->typ_chain, &obd_types);
207         cfs_spin_unlock(&obd_types_lock);
208
209         RETURN (0);
210
211  failed:
212         if (type->typ_name != NULL)
213                 OBD_FREE(type->typ_name, strlen(name) + 1);
214         if (type->typ_md_ops != NULL)
215                 OBD_FREE_PTR(type->typ_md_ops);
216         if (type->typ_dt_ops != NULL)
217                 OBD_FREE_PTR(type->typ_dt_ops);
218         OBD_FREE(type, sizeof(*type));
219         RETURN(rc);
220 }
221
222 int class_unregister_type(const char *name)
223 {
224         struct obd_type *type = class_search_type(name);
225         ENTRY;
226
227         if (!type) {
228                 CERROR("unknown obd type\n");
229                 RETURN(-EINVAL);
230         }
231
232         if (type->typ_refcnt) {
233                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234                 /* This is a bad situation, let's make the best of it */
235                 /* Remove ops, but leave the name for debugging */
236                 OBD_FREE_PTR(type->typ_dt_ops);
237                 OBD_FREE_PTR(type->typ_md_ops);
238                 RETURN(-EBUSY);
239         }
240
241         if (type->typ_procroot) {
242                 lprocfs_remove(&type->typ_procroot);
243         }
244
245         if (type->typ_lu)
246                 lu_device_type_fini(type->typ_lu);
247
248         cfs_spin_lock(&obd_types_lock);
249         cfs_list_del(&type->typ_chain);
250         cfs_spin_unlock(&obd_types_lock);
251         OBD_FREE(type->typ_name, strlen(name) + 1);
252         if (type->typ_dt_ops != NULL)
253                 OBD_FREE_PTR(type->typ_dt_ops);
254         if (type->typ_md_ops != NULL)
255                 OBD_FREE_PTR(type->typ_md_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(0);
258 } /* class_unregister_type */
259
260 /**
261  * Create a new obd device.
262  *
263  * Find an empty slot in ::obd_devs[], create a new obd device in it.
264  *
265  * \param[in] type_name obd device type string.
266  * \param[in] name      obd device name.
267  *
268  * \retval NULL if create fails, otherwise return the obd device
269  *         pointer created.
270  */
271 struct obd_device *class_newdev(const char *type_name, const char *name)
272 {
273         struct obd_device *result = NULL;
274         struct obd_device *newdev;
275         struct obd_type *type = NULL;
276         int i;
277         int new_obd_minor = 0;
278
279         if (strlen(name) >= MAX_OBD_NAME) {
280                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281                 RETURN(ERR_PTR(-EINVAL));
282         }
283
284         type = class_get_type(type_name);
285         if (type == NULL){
286                 CERROR("OBD: unknown type: %s\n", type_name);
287                 RETURN(ERR_PTR(-ENODEV));
288         }
289
290         newdev = obd_device_alloc();
291         if (newdev == NULL) {
292                 class_put_type(type);
293                 RETURN(ERR_PTR(-ENOMEM));
294         }
295         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296
297         cfs_spin_lock(&obd_dev_lock);
298         for (i = 0; i < class_devno_max(); i++) {
299                 struct obd_device *obd = class_num2obd(i);
300                 if (obd && obd->obd_name &&
301                     (strcmp(name, obd->obd_name) == 0)) {
302                         CERROR("Device %s already exists, won't add\n", name);
303                         if (result) {
304                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305                                          "%p obd_magic %08x != %08x\n", result,
306                                          result->obd_magic, OBD_DEVICE_MAGIC);
307                                 LASSERTF(result->obd_minor == new_obd_minor,
308                                          "%p obd_minor %d != %d\n", result,
309                                          result->obd_minor, new_obd_minor);
310
311                                 obd_devs[result->obd_minor] = NULL;
312                                 result->obd_name[0]='\0';
313                          }
314                         result = ERR_PTR(-EEXIST);
315                         break;
316                 }
317                 if (!result && !obd) {
318                         result = newdev;
319                         result->obd_minor = i;
320                         new_obd_minor = i;
321                         result->obd_type = type;
322                         strncpy(result->obd_name, name,
323                                 sizeof(result->obd_name) - 1);
324                         obd_devs[i] = result;
325                 }
326         }
327         cfs_spin_unlock(&obd_dev_lock);
328
329         if (result == NULL && i >= class_devno_max()) {
330                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331                        class_devno_max());
332                 result = ERR_PTR(-EOVERFLOW);
333         }
334
335         if (IS_ERR(result)) {
336                 obd_device_free(newdev);
337                 class_put_type(type);
338         } else {
339                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340                        result->obd_name, result);
341         }
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type != NULL);
354
355         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356                obd->obd_name,obd->obd_type->typ_name);
357
358         cfs_spin_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         cfs_spin_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         cfs_spin_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377                         /* Make sure we finished attaching before we give
378                            out any references */
379                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380                         if (obd->obd_attached) {
381                                 cfs_spin_unlock(&obd_dev_lock);
382                                 return i;
383                         }
384                         break;
385                 }
386         }
387         cfs_spin_unlock(&obd_dev_lock);
388
389         return -1;
390 }
391
392 struct obd_device *class_name2obd(const char *name)
393 {
394         int dev = class_name2dev(name);
395
396         if (dev < 0 || dev > class_devno_max())
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 int class_uuid2dev(struct obd_uuid *uuid)
402 {
403         int i;
404
405         cfs_spin_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         cfs_spin_unlock(&obd_dev_lock);
411                         return i;
412                 }
413         }
414         cfs_spin_unlock(&obd_dev_lock);
415
416         return -1;
417 }
418
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 {
421         int dev = class_uuid2dev(uuid);
422         if (dev < 0)
423                 return NULL;
424         return class_num2obd(dev);
425 }
426
427 /**
428  * Get obd device from ::obd_devs[]
429  *
430  * \param num [in] array index
431  *
432  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433  *         otherwise return the obd device there.
434  */
435 struct obd_device *class_num2obd(int num)
436 {
437         struct obd_device *obd = NULL;
438
439         if (num < class_devno_max()) {
440                 obd = obd_devs[num];
441                 if (obd == NULL)
442                         return NULL;
443
444                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445                          "%p obd_magic %08x != %08x\n",
446                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447                 LASSERTF(obd->obd_minor == num,
448                          "%p obd_minor %0d != %0d\n",
449                          obd, obd->obd_minor, num);
450         }
451
452         return obd;
453 }
454
455 void class_obd_list(void)
456 {
457         char *status;
458         int i;
459
460         cfs_spin_lock(&obd_dev_lock);
461         for (i = 0; i < class_devno_max(); i++) {
462                 struct obd_device *obd = class_num2obd(i);
463                 if (obd == NULL)
464                         continue;
465                 if (obd->obd_stopping)
466                         status = "ST";
467                 else if (obd->obd_set_up)
468                         status = "UP";
469                 else if (obd->obd_attached)
470                         status = "AT";
471                 else
472                         status = "--";
473                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474                          i, status, obd->obd_type->typ_name,
475                          obd->obd_name, obd->obd_uuid.uuid,
476                          cfs_atomic_read(&obd->obd_refcount));
477         }
478         cfs_spin_unlock(&obd_dev_lock);
479         return;
480 }
481
482 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
483    specified, then only the client with that uuid is returned,
484    otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486                                           const char * typ_name,
487                                           struct obd_uuid *grp_uuid)
488 {
489         int i;
490
491         cfs_spin_lock(&obd_dev_lock);
492         for (i = 0; i < class_devno_max(); i++) {
493                 struct obd_device *obd = class_num2obd(i);
494                 if (obd == NULL)
495                         continue;
496                 if ((strncmp(obd->obd_type->typ_name, typ_name,
497                              strlen(typ_name)) == 0)) {
498                         if (obd_uuid_equals(tgt_uuid,
499                                             &obd->u.cli.cl_target_uuid) &&
500                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
501                                                          &obd->obd_uuid) : 1)) {
502                                 cfs_spin_unlock(&obd_dev_lock);
503                                 return obd;
504                         }
505                 }
506         }
507         cfs_spin_unlock(&obd_dev_lock);
508
509         return NULL;
510 }
511
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513    searching at *next, and if a device is found, the next index to look
514    at is saved in *next. If next is NULL, then the first matching device
515    will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
517 {
518         int i;
519
520         if (next == NULL)
521                 i = 0;
522         else if (*next >= 0 && *next < class_devno_max())
523                 i = *next;
524         else
525                 return NULL;
526
527         cfs_spin_lock(&obd_dev_lock);
528         for (; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530                 if (obd == NULL)
531                         continue;
532                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
533                         if (next != NULL)
534                                 *next = i+1;
535                         cfs_spin_unlock(&obd_dev_lock);
536                         return obd;
537                 }
538         }
539         cfs_spin_unlock(&obd_dev_lock);
540
541         return NULL;
542 }
543
544 /**
545  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546  * adjust sptlrpc settings accordingly.
547  */
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 {
550         struct obd_device  *obd;
551         const char         *type;
552         int                 i, rc = 0, rc2;
553
554         LASSERT(namelen > 0);
555
556         cfs_spin_lock(&obd_dev_lock);
557         for (i = 0; i < class_devno_max(); i++) {
558                 obd = class_num2obd(i);
559
560                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
561                         continue;
562
563                 /* only notify mdc, osc, mdt, ost */
564                 type = obd->obd_type->typ_name;
565                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OST_NAME) != 0)
569                         continue;
570
571                 if (strncmp(obd->obd_name, fsname, namelen))
572                         continue;
573
574                 class_incref(obd, __FUNCTION__, obd);
575                 cfs_spin_unlock(&obd_dev_lock);
576                 rc2 = obd_set_info_async(obd->obd_self_export,
577                                          sizeof(KEY_SPTLRPC_CONF),
578                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
579                 rc = rc ? rc : rc2;
580                 class_decref(obd, __FUNCTION__, obd);
581                 cfs_spin_lock(&obd_dev_lock);
582         }
583         cfs_spin_unlock(&obd_dev_lock);
584         return rc;
585 }
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587
588 void obd_cleanup_caches(void)
589 {
590         int rc;
591
592         ENTRY;
593         if (obd_device_cachep) {
594                 rc = cfs_mem_cache_destroy(obd_device_cachep);
595                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596                 obd_device_cachep = NULL;
597         }
598         if (obdo_cachep) {
599                 rc = cfs_mem_cache_destroy(obdo_cachep);
600                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601                 obdo_cachep = NULL;
602         }
603         if (import_cachep) {
604                 rc = cfs_mem_cache_destroy(import_cachep);
605                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606                 import_cachep = NULL;
607         }
608         if (capa_cachep) {
609                 rc = cfs_mem_cache_destroy(capa_cachep);
610                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
611                 capa_cachep = NULL;
612         }
613         EXIT;
614 }
615
616 int obd_init_caches(void)
617 {
618         ENTRY;
619
620         LASSERT(obd_device_cachep == NULL);
621         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622                                                  sizeof(struct obd_device),
623                                                  0, 0);
624         if (!obd_device_cachep)
625                 GOTO(out, -ENOMEM);
626
627         LASSERT(obdo_cachep == NULL);
628         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
629                                            0, 0);
630         if (!obdo_cachep)
631                 GOTO(out, -ENOMEM);
632
633         LASSERT(import_cachep == NULL);
634         import_cachep = cfs_mem_cache_create("ll_import_cache",
635                                              sizeof(struct obd_import),
636                                              0, 0);
637         if (!import_cachep)
638                 GOTO(out, -ENOMEM);
639
640         LASSERT(capa_cachep == NULL);
641         capa_cachep = cfs_mem_cache_create("capa_cache",
642                                            sizeof(struct obd_capa), 0, 0);
643         if (!capa_cachep)
644                 GOTO(out, -ENOMEM);
645
646         RETURN(0);
647  out:
648         obd_cleanup_caches();
649         RETURN(-ENOMEM);
650
651 }
652
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 {
656         struct obd_export *export;
657         ENTRY;
658
659         if (!conn) {
660                 CDEBUG(D_CACHE, "looking for null handle\n");
661                 RETURN(NULL);
662         }
663
664         if (conn->cookie == -1) {  /* this means assign a new connection */
665                 CDEBUG(D_CACHE, "want a new connection\n");
666                 RETURN(NULL);
667         }
668
669         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670         export = class_handle2object(conn->cookie);
671         RETURN(export);
672 }
673
674 struct obd_device *class_exp2obd(struct obd_export *exp)
675 {
676         if (exp)
677                 return exp->exp_obd;
678         return NULL;
679 }
680
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 {
683         struct obd_export *export;
684         export = class_conn2export(conn);
685         if (export) {
686                 struct obd_device *obd = export->exp_obd;
687                 class_export_put(export);
688                 return obd;
689         }
690         return NULL;
691 }
692
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 {
695         struct obd_device *obd = exp->exp_obd;
696         if (obd == NULL)
697                 return NULL;
698         return obd->u.cli.cl_import;
699 }
700
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 {
703         struct obd_device *obd = class_conn2obd(conn);
704         if (obd == NULL)
705                 return NULL;
706         return obd->u.cli.cl_import;
707 }
708
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
711 {
712         struct obd_device *obd = exp->exp_obd;
713         ENTRY;
714
715         LASSERT (cfs_atomic_read(&exp->exp_refcount) == 0);
716
717         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718                exp->exp_client_uuid.uuid, obd->obd_name);
719
720         LASSERT(obd != NULL);
721
722         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723         if (exp->exp_connection)
724                 ptlrpc_put_connection_superhack(exp->exp_connection);
725
726         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
727         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
728         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
729         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
730         obd_destroy_export(exp);
731         class_decref(obd, "export", exp);
732
733         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
734         EXIT;
735 }
736
737 static void export_handle_addref(void *export)
738 {
739         class_export_get(export);
740 }
741
742 struct obd_export *class_export_get(struct obd_export *exp)
743 {
744         cfs_atomic_inc(&exp->exp_refcount);
745         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746                cfs_atomic_read(&exp->exp_refcount));
747         return exp;
748 }
749 EXPORT_SYMBOL(class_export_get);
750
751 void class_export_put(struct obd_export *exp)
752 {
753         LASSERT(exp != NULL);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                cfs_atomic_read(&exp->exp_refcount) - 1);
756         LASSERT(cfs_atomic_read(&exp->exp_refcount) > 0);
757         LASSERT(cfs_atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758
759         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
760                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
761                 CDEBUG(D_IOCTL, "final put %p/%s\n",
762                        exp, exp->exp_client_uuid.uuid);
763                 obd_zombie_export_add(exp);
764         }
765 }
766 EXPORT_SYMBOL(class_export_put);
767
768 /* Creates a new export, adds it to the hash table, and returns a
769  * pointer to it. The refcount is 2: one for the hash reference, and
770  * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772                                     struct obd_uuid *cluuid)
773 {
774         struct obd_export *export;
775         int rc = 0;
776         ENTRY;
777
778         OBD_ALLOC_PTR(export);
779         if (!export)
780                 return ERR_PTR(-ENOMEM);
781
782         export->exp_conn_cnt = 0;
783         export->exp_lock_hash = NULL;
784         cfs_atomic_set(&export->exp_refcount, 2);
785         cfs_atomic_set(&export->exp_rpc_count, 0);
786         cfs_atomic_set(&export->exp_cb_count, 0);
787         cfs_atomic_set(&export->exp_locks_count, 0);
788 #if LUSTRE_TRACKS_LOCK_EXP_REFS
789         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
790         cfs_spin_lock_init(&export->exp_locks_list_guard);
791 #endif
792         cfs_atomic_set(&export->exp_replay_count, 0);
793         export->exp_obd = obd;
794         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
795         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
796         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
797         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
798         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
799         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
800         class_handle_hash(&export->exp_handle, export_handle_addref);
801         export->exp_last_request_time = cfs_time_current_sec();
802         cfs_spin_lock_init(&export->exp_lock);
803         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
804         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
805
806         export->exp_sp_peer = LUSTRE_SP_ANY;
807         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
808         export->exp_client_uuid = *cluuid;
809         obd_init_export(export);
810
811         cfs_spin_lock(&obd->obd_dev_lock);
812          /* shouldn't happen, but might race */
813         if (obd->obd_stopping)
814                 GOTO(exit_err, rc = -ENODEV);
815
816         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
817                 rc = cfs_hash_add_unique(obd->obd_uuid_hash, cluuid,
818                                          &export->exp_uuid_hash);
819                 if (rc != 0) {
820                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
821                                       obd->obd_name, cluuid->uuid, rc);
822                         GOTO(exit_err, rc = -EALREADY);
823                 }
824         }
825
826         class_incref(obd, "export", export);
827         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
828         cfs_list_add_tail(&export->exp_obd_chain_timed,
829                           &export->exp_obd->obd_exports_timed);
830         export->exp_obd->obd_num_exports++;
831         cfs_spin_unlock(&obd->obd_dev_lock);
832         RETURN(export);
833
834 exit_err:
835         cfs_spin_unlock(&obd->obd_dev_lock);
836         class_handle_unhash(&export->exp_handle);
837         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
838         obd_destroy_export(export);
839         OBD_FREE_PTR(export);
840         return ERR_PTR(rc);
841 }
842 EXPORT_SYMBOL(class_new_export);
843
844 void class_unlink_export(struct obd_export *exp)
845 {
846         class_handle_unhash(&exp->exp_handle);
847
848         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
849         /* delete an uuid-export hashitem from hashtables */
850         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
851                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
852                              &exp->exp_client_uuid,
853                              &exp->exp_uuid_hash);
854
855         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
856         cfs_list_del_init(&exp->exp_obd_chain_timed);
857         exp->exp_obd->obd_num_exports--;
858         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
859         class_export_put(exp);
860 }
861 EXPORT_SYMBOL(class_unlink_export);
862
863 /* Import management functions */
864 void class_import_destroy(struct obd_import *imp)
865 {
866         ENTRY;
867
868         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
869                 imp->imp_obd->obd_name);
870
871         LASSERT(cfs_atomic_read(&imp->imp_refcount) == 0);
872
873         ptlrpc_put_connection_superhack(imp->imp_connection);
874
875         while (!cfs_list_empty(&imp->imp_conn_list)) {
876                 struct obd_import_conn *imp_conn;
877
878                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
879                                           struct obd_import_conn, oic_item);
880                 cfs_list_del_init(&imp_conn->oic_item);
881                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
882                 OBD_FREE(imp_conn, sizeof(*imp_conn));
883         }
884
885         LASSERT(imp->imp_sec == NULL);
886         class_decref(imp->imp_obd, "import", imp);
887         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
888         EXIT;
889 }
890
891 static void import_handle_addref(void *import)
892 {
893         class_import_get(import);
894 }
895
896 struct obd_import *class_import_get(struct obd_import *import)
897 {
898         LASSERT(cfs_atomic_read(&import->imp_refcount) >= 0);
899         LASSERT(cfs_atomic_read(&import->imp_refcount) < 0x5a5a5a);
900         cfs_atomic_inc(&import->imp_refcount);
901         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
902                cfs_atomic_read(&import->imp_refcount),
903                import->imp_obd->obd_name);
904         return import;
905 }
906 EXPORT_SYMBOL(class_import_get);
907
908 void class_import_put(struct obd_import *imp)
909 {
910         ENTRY;
911
912         LASSERT(cfs_atomic_read(&imp->imp_refcount) > 0);
913         LASSERT(cfs_atomic_read(&imp->imp_refcount) < 0x5a5a5a);
914         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
915
916         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
917                cfs_atomic_read(&imp->imp_refcount) - 1,
918                imp->imp_obd->obd_name);
919
920         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
921                 CDEBUG(D_INFO, "final put import %p\n", imp);
922                 obd_zombie_import_add(imp);
923         }
924
925         EXIT;
926 }
927 EXPORT_SYMBOL(class_import_put);
928
929 static void init_imp_at(struct imp_at *at) {
930         int i;
931         at_init(&at->iat_net_latency, 0, 0);
932         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
933                 /* max service estimates are tracked on the server side, so
934                    don't use the AT history here, just use the last reported
935                    val. (But keep hist for proc histogram, worst_ever) */
936                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
937                         AT_FLG_NOHIST);
938         }
939 }
940
941 struct obd_import *class_new_import(struct obd_device *obd)
942 {
943         struct obd_import *imp;
944
945         OBD_ALLOC(imp, sizeof(*imp));
946         if (imp == NULL)
947                 return NULL;
948
949         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
950         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
951         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
952         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
953         cfs_spin_lock_init(&imp->imp_lock);
954         imp->imp_last_success_conn = 0;
955         imp->imp_state = LUSTRE_IMP_NEW;
956         imp->imp_obd = class_incref(obd, "import", imp);
957         cfs_sema_init(&imp->imp_sec_mutex, 1);
958         cfs_waitq_init(&imp->imp_recovery_waitq);
959
960         cfs_atomic_set(&imp->imp_refcount, 2);
961         cfs_atomic_set(&imp->imp_unregistering, 0);
962         cfs_atomic_set(&imp->imp_inflight, 0);
963         cfs_atomic_set(&imp->imp_replay_inflight, 0);
964         cfs_atomic_set(&imp->imp_inval_count, 0);
965         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
966         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
967         class_handle_hash(&imp->imp_handle, import_handle_addref);
968         init_imp_at(&imp->imp_at);
969
970         /* the default magic is V2, will be used in connect RPC, and
971          * then adjusted according to the flags in request/reply. */
972         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
973
974         return imp;
975 }
976 EXPORT_SYMBOL(class_new_import);
977
978 void class_destroy_import(struct obd_import *import)
979 {
980         LASSERT(import != NULL);
981         LASSERT(import != LP_POISON);
982
983         class_handle_unhash(&import->imp_handle);
984
985         cfs_spin_lock(&import->imp_lock);
986         import->imp_generation++;
987         cfs_spin_unlock(&import->imp_lock);
988         class_import_put(import);
989 }
990 EXPORT_SYMBOL(class_destroy_import);
991
992 #if LUSTRE_TRACKS_LOCK_EXP_REFS
993
994 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
995 {
996         cfs_spin_lock(&exp->exp_locks_list_guard);
997
998         LASSERT(lock->l_exp_refs_nr >= 0);
999
1000         if (lock->l_exp_refs_target != NULL &&
1001             lock->l_exp_refs_target != exp) {
1002                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1003                               exp, lock, lock->l_exp_refs_target);
1004         }
1005         if ((lock->l_exp_refs_nr ++) == 0) {
1006                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1007                 lock->l_exp_refs_target = exp;
1008         }
1009         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1010                lock, exp, lock->l_exp_refs_nr);
1011         cfs_spin_unlock(&exp->exp_locks_list_guard);
1012 }
1013 EXPORT_SYMBOL(__class_export_add_lock_ref);
1014
1015 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1016 {
1017         cfs_spin_lock(&exp->exp_locks_list_guard);
1018         LASSERT(lock->l_exp_refs_nr > 0);
1019         if (lock->l_exp_refs_target != exp) {
1020                 LCONSOLE_WARN("lock %p, "
1021                               "mismatching export pointers: %p, %p\n",
1022                               lock, lock->l_exp_refs_target, exp);
1023         }
1024         if (-- lock->l_exp_refs_nr == 0) {
1025                 cfs_list_del_init(&lock->l_exp_refs_link);
1026                 lock->l_exp_refs_target = NULL;
1027         }
1028         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1029                lock, exp, lock->l_exp_refs_nr);
1030         cfs_spin_unlock(&exp->exp_locks_list_guard);
1031 }
1032 EXPORT_SYMBOL(__class_export_del_lock_ref);
1033 #endif
1034
1035 /* A connection defines an export context in which preallocation can
1036    be managed. This releases the export pointer reference, and returns
1037    the export handle, so the export refcount is 1 when this function
1038    returns. */
1039 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1040                   struct obd_uuid *cluuid)
1041 {
1042         struct obd_export *export;
1043         LASSERT(conn != NULL);
1044         LASSERT(obd != NULL);
1045         LASSERT(cluuid != NULL);
1046         ENTRY;
1047
1048         export = class_new_export(obd, cluuid);
1049         if (IS_ERR(export))
1050                 RETURN(PTR_ERR(export));
1051
1052         conn->cookie = export->exp_handle.h_cookie;
1053         class_export_put(export);
1054
1055         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1056                cluuid->uuid, conn->cookie);
1057         RETURN(0);
1058 }
1059 EXPORT_SYMBOL(class_connect);
1060
1061 /* if export is involved in recovery then clean up related things */
1062 void class_export_recovery_cleanup(struct obd_export *exp)
1063 {
1064         struct obd_device *obd = exp->exp_obd;
1065
1066         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
1067         if (exp->exp_delayed)
1068                 obd->obd_delayed_clients--;
1069         if (obd->obd_recovering && exp->exp_in_recovery) {
1070                 cfs_spin_lock(&exp->exp_lock);
1071                 exp->exp_in_recovery = 0;
1072                 cfs_spin_unlock(&exp->exp_lock);
1073                 LASSERT(obd->obd_connected_clients);
1074                 obd->obd_connected_clients--;
1075         }
1076         /** Cleanup req replay fields */
1077         if (exp->exp_req_replay_needed) {
1078                 cfs_spin_lock(&exp->exp_lock);
1079                 exp->exp_req_replay_needed = 0;
1080                 cfs_spin_unlock(&exp->exp_lock);
1081                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1082                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1083         }
1084         /** Cleanup lock replay data */
1085         if (exp->exp_lock_replay_needed) {
1086                 cfs_spin_lock(&exp->exp_lock);
1087                 exp->exp_lock_replay_needed = 0;
1088                 cfs_spin_unlock(&exp->exp_lock);
1089                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1090                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1091         }
1092         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
1093 }
1094
1095 /* This function removes 1-3 references from the export:
1096  * 1 - for export pointer passed
1097  * and if disconnect really need
1098  * 2 - removing from hash
1099  * 3 - in client_unlink_export
1100  * The export pointer passed to this function can destroyed */
1101 int class_disconnect(struct obd_export *export)
1102 {
1103         int already_disconnected;
1104         ENTRY;
1105
1106         if (export == NULL) {
1107                 fixme();
1108                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1109                 RETURN(-EINVAL);
1110         }
1111
1112         cfs_spin_lock(&export->exp_lock);
1113         already_disconnected = export->exp_disconnected;
1114         export->exp_disconnected = 1;
1115         cfs_spin_unlock(&export->exp_lock);
1116
1117         /* class_cleanup(), abort_recovery(), and class_fail_export()
1118          * all end up in here, and if any of them race we shouldn't
1119          * call extra class_export_puts(). */
1120         if (already_disconnected) {
1121                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1122                 GOTO(no_disconn, already_disconnected);
1123         }
1124
1125         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1126                export->exp_handle.h_cookie);
1127
1128         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1129                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1130                              &export->exp_connection->c_peer.nid,
1131                              &export->exp_nid_hash);
1132
1133         class_export_recovery_cleanup(export);
1134         class_unlink_export(export);
1135 no_disconn:
1136         class_export_put(export);
1137         RETURN(0);
1138 }
1139
1140 /* Return non-zero for a fully connected export */
1141 int class_connected_export(struct obd_export *exp)
1142 {
1143         if (exp) {
1144                 int connected;
1145                 cfs_spin_lock(&exp->exp_lock);
1146                 connected = (exp->exp_conn_cnt > 0);
1147                 cfs_spin_unlock(&exp->exp_lock);
1148                 return connected;
1149         }
1150         return 0;
1151 }
1152 EXPORT_SYMBOL(class_connected_export);
1153
1154 static void class_disconnect_export_list(cfs_list_t *list,
1155                                          enum obd_option flags)
1156 {
1157         int rc;
1158         struct obd_export *exp;
1159         ENTRY;
1160
1161         /* It's possible that an export may disconnect itself, but
1162          * nothing else will be added to this list. */
1163         while (!cfs_list_empty(list)) {
1164                 exp = cfs_list_entry(list->next, struct obd_export,
1165                                      exp_obd_chain);
1166                 /* need for safe call CDEBUG after obd_disconnect */
1167                 class_export_get(exp);
1168
1169                 cfs_spin_lock(&exp->exp_lock);
1170                 exp->exp_flags = flags;
1171                 cfs_spin_unlock(&exp->exp_lock);
1172
1173                 if (obd_uuid_equals(&exp->exp_client_uuid,
1174                                     &exp->exp_obd->obd_uuid)) {
1175                         CDEBUG(D_HA,
1176                                "exp %p export uuid == obd uuid, don't discon\n",
1177                                exp);
1178                         /* Need to delete this now so we don't end up pointing
1179                          * to work_list later when this export is cleaned up. */
1180                         cfs_list_del_init(&exp->exp_obd_chain);
1181                         class_export_put(exp);
1182                         continue;
1183                 }
1184
1185                 class_export_get(exp);
1186                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1187                        "last request at "CFS_TIME_T"\n",
1188                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1189                        exp, exp->exp_last_request_time);
1190                 /* release one export reference anyway */
1191                 rc = obd_disconnect(exp);
1192
1193                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1194                        obd_export_nid2str(exp), exp, rc);
1195                 class_export_put(exp);
1196         }
1197         EXIT;
1198 }
1199
1200 void class_disconnect_exports(struct obd_device *obd)
1201 {
1202         cfs_list_t work_list;
1203         ENTRY;
1204
1205         /* Move all of the exports from obd_exports to a work list, en masse. */
1206         CFS_INIT_LIST_HEAD(&work_list);
1207         cfs_spin_lock(&obd->obd_dev_lock);
1208         cfs_list_splice_init(&obd->obd_exports, &work_list);
1209         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1210         cfs_spin_unlock(&obd->obd_dev_lock);
1211
1212         if (!cfs_list_empty(&work_list)) {
1213                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1214                        "disconnecting them\n", obd->obd_minor, obd);
1215                 class_disconnect_export_list(&work_list,
1216                                              exp_flags_from_obd(obd));
1217         } else
1218                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1219                        obd->obd_minor, obd);
1220         EXIT;
1221 }
1222 EXPORT_SYMBOL(class_disconnect_exports);
1223
1224 /* Remove exports that have not completed recovery.
1225  */
1226 void class_disconnect_stale_exports(struct obd_device *obd,
1227                                     int (*test_export)(struct obd_export *))
1228 {
1229         cfs_list_t work_list;
1230         cfs_list_t *pos, *n;
1231         struct obd_export *exp;
1232         int evicted = 0;
1233         ENTRY;
1234
1235         CFS_INIT_LIST_HEAD(&work_list);
1236         cfs_spin_lock(&obd->obd_dev_lock);
1237         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1238                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1239                 if (test_export(exp))
1240                         continue;
1241
1242                 /* don't count self-export as client */
1243                 if (obd_uuid_equals(&exp->exp_client_uuid,
1244                                     &exp->exp_obd->obd_uuid))
1245                         continue;
1246
1247                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1248                 evicted++;
1249                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1250                        obd->obd_name, exp->exp_client_uuid.uuid,
1251                        exp->exp_connection == NULL ? "<unknown>" :
1252                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1253                 print_export_data(exp, "EVICTING", 0);
1254         }
1255         cfs_spin_unlock(&obd->obd_dev_lock);
1256
1257         if (evicted) {
1258                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1259                        obd->obd_name, evicted);
1260                 obd->obd_stale_clients += evicted;
1261         }
1262         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1263                                                  OBD_OPT_ABORT_RECOV);
1264         EXIT;
1265 }
1266 EXPORT_SYMBOL(class_disconnect_stale_exports);
1267
1268 void class_fail_export(struct obd_export *exp)
1269 {
1270         int rc, already_failed;
1271
1272         cfs_spin_lock(&exp->exp_lock);
1273         already_failed = exp->exp_failed;
1274         exp->exp_failed = 1;
1275         cfs_spin_unlock(&exp->exp_lock);
1276
1277         if (already_failed) {
1278                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1279                        exp, exp->exp_client_uuid.uuid);
1280                 return;
1281         }
1282
1283         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1284                exp, exp->exp_client_uuid.uuid);
1285
1286         if (obd_dump_on_timeout)
1287                 libcfs_debug_dumplog();
1288
1289         /* Most callers into obd_disconnect are removing their own reference
1290          * (request, for example) in addition to the one from the hash table.
1291          * We don't have such a reference here, so make one. */
1292         class_export_get(exp);
1293         rc = obd_disconnect(exp);
1294         if (rc)
1295                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1296         else
1297                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1298                        exp, exp->exp_client_uuid.uuid);
1299 }
1300 EXPORT_SYMBOL(class_fail_export);
1301
1302 char *obd_export_nid2str(struct obd_export *exp)
1303 {
1304         if (exp->exp_connection != NULL)
1305                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1306
1307         return "(no nid)";
1308 }
1309 EXPORT_SYMBOL(obd_export_nid2str);
1310
1311 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1312 {
1313         struct obd_export *doomed_exp = NULL;
1314         int exports_evicted = 0;
1315
1316         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1317
1318         do {
1319                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1320                 if (doomed_exp == NULL)
1321                         break;
1322
1323                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1324                          "nid %s found, wanted nid %s, requested nid %s\n",
1325                          obd_export_nid2str(doomed_exp),
1326                          libcfs_nid2str(nid_key), nid);
1327                 LASSERTF(doomed_exp != obd->obd_self_export,
1328                          "self-export is hashed by NID?\n");
1329                 exports_evicted++;
1330                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1331                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1332                        exports_evicted);
1333                 class_fail_export(doomed_exp);
1334                 class_export_put(doomed_exp);
1335         } while (1);
1336
1337         if (!exports_evicted)
1338                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1339                        obd->obd_name, nid);
1340         return exports_evicted;
1341 }
1342 EXPORT_SYMBOL(obd_export_evict_by_nid);
1343
1344 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1345 {
1346         struct obd_export *doomed_exp = NULL;
1347         struct obd_uuid doomed_uuid;
1348         int exports_evicted = 0;
1349
1350         obd_str2uuid(&doomed_uuid, uuid);
1351         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1352                 CERROR("%s: can't evict myself\n", obd->obd_name);
1353                 return exports_evicted;
1354         }
1355
1356         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1357
1358         if (doomed_exp == NULL) {
1359                 CERROR("%s: can't disconnect %s: no exports found\n",
1360                        obd->obd_name, uuid);
1361         } else {
1362                 CWARN("%s: evicting %s at adminstrative request\n",
1363                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1364                 class_fail_export(doomed_exp);
1365                 class_export_put(doomed_exp);
1366                 exports_evicted++;
1367         }
1368
1369         return exports_evicted;
1370 }
1371 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1372
1373 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1374 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1375 EXPORT_SYMBOL(class_export_dump_hook);
1376 #endif
1377
1378 static void print_export_data(struct obd_export *exp, const char *status,
1379                               int locks)
1380 {
1381         struct ptlrpc_reply_state *rs;
1382         struct ptlrpc_reply_state *first_reply = NULL;
1383         int nreplies = 0;
1384
1385         cfs_spin_lock(&exp->exp_lock);
1386         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1387                                 rs_exp_list) {
1388                 if (nreplies == 0)
1389                         first_reply = rs;
1390                 nreplies++;
1391         }
1392         cfs_spin_unlock(&exp->exp_lock);
1393
1394         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1395                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1396                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1397                cfs_atomic_read(&exp->exp_rpc_count),
1398                cfs_atomic_read(&exp->exp_cb_count),
1399                cfs_atomic_read(&exp->exp_locks_count),
1400                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1401                nreplies, first_reply, nreplies > 3 ? "..." : "",
1402                exp->exp_last_committed);
1403 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1404         if (locks && class_export_dump_hook != NULL)
1405                 class_export_dump_hook(exp);
1406 #endif
1407 }
1408
1409 void dump_exports(struct obd_device *obd, int locks)
1410 {
1411         struct obd_export *exp;
1412
1413         cfs_spin_lock(&obd->obd_dev_lock);
1414         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1415                 print_export_data(exp, "ACTIVE", locks);
1416         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1417                 print_export_data(exp, "UNLINKED", locks);
1418         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1419                 print_export_data(exp, "DELAYED", locks);
1420         cfs_spin_unlock(&obd->obd_dev_lock);
1421         cfs_spin_lock(&obd_zombie_impexp_lock);
1422         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1423                 print_export_data(exp, "ZOMBIE", locks);
1424         cfs_spin_unlock(&obd_zombie_impexp_lock);
1425 }
1426 EXPORT_SYMBOL(dump_exports);
1427
1428 void obd_exports_barrier(struct obd_device *obd)
1429 {
1430         int waited = 2;
1431         LASSERT(cfs_list_empty(&obd->obd_exports));
1432         cfs_spin_lock(&obd->obd_dev_lock);
1433         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1434                 cfs_spin_unlock(&obd->obd_dev_lock);
1435                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1436                                                    cfs_time_seconds(waited));
1437                 if (waited > 5 && IS_PO2(waited)) {
1438                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1439                                       "more than %d seconds. "
1440                                       "The obd refcount = %d. Is it stuck?\n",
1441                                       obd->obd_name, waited,
1442                                       cfs_atomic_read(&obd->obd_refcount));
1443                         dump_exports(obd, 0);
1444                 }
1445                 waited *= 2;
1446                 cfs_spin_lock(&obd->obd_dev_lock);
1447         }
1448         cfs_spin_unlock(&obd->obd_dev_lock);
1449 }
1450 EXPORT_SYMBOL(obd_exports_barrier);
1451
1452 /**
1453  * kill zombie imports and exports
1454  */
1455 void obd_zombie_impexp_cull(void)
1456 {
1457         struct obd_import *import;
1458         struct obd_export *export;
1459         ENTRY;
1460
1461         do {
1462                 cfs_spin_lock(&obd_zombie_impexp_lock);
1463
1464                 import = NULL;
1465                 if (!cfs_list_empty(&obd_zombie_imports)) {
1466                         import = cfs_list_entry(obd_zombie_imports.next,
1467                                                 struct obd_import,
1468                                                 imp_zombie_chain);
1469                         cfs_list_del_init(&import->imp_zombie_chain);
1470                 }
1471
1472                 export = NULL;
1473                 if (!cfs_list_empty(&obd_zombie_exports)) {
1474                         export = cfs_list_entry(obd_zombie_exports.next,
1475                                                 struct obd_export,
1476                                                 exp_obd_chain);
1477                         cfs_list_del_init(&export->exp_obd_chain);
1478                 }
1479
1480                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1481
1482                 if (import != NULL)
1483                         class_import_destroy(import);
1484
1485                 if (export != NULL)
1486                         class_export_destroy(export);
1487
1488                 cfs_cond_resched();
1489         } while (import != NULL || export != NULL);
1490         EXIT;
1491 }
1492
1493 static cfs_completion_t         obd_zombie_start;
1494 static cfs_completion_t         obd_zombie_stop;
1495 static unsigned long            obd_zombie_flags;
1496 static cfs_waitq_t              obd_zombie_waitq;
1497 static pid_t                    obd_zombie_pid;
1498
1499 enum {
1500         OBD_ZOMBIE_STOP   = 1 << 1
1501 };
1502
1503 /**
1504  * check for work for kill zombie import/export thread.
1505  */
1506 static int obd_zombie_impexp_check(void *arg)
1507 {
1508         int rc;
1509
1510         cfs_spin_lock(&obd_zombie_impexp_lock);
1511         rc = cfs_list_empty(&obd_zombie_imports) &&
1512              cfs_list_empty(&obd_zombie_exports) &&
1513              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1514
1515         cfs_spin_unlock(&obd_zombie_impexp_lock);
1516
1517         RETURN(rc);
1518 }
1519
1520 /**
1521  * Add export to the obd_zombe thread and notify it.
1522  */
1523 static void obd_zombie_export_add(struct obd_export *exp) {
1524         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1525         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1526         cfs_list_del_init(&exp->exp_obd_chain);
1527         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1528         cfs_spin_lock(&obd_zombie_impexp_lock);
1529         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1530         cfs_spin_unlock(&obd_zombie_impexp_lock);
1531
1532         if (obd_zombie_impexp_notify != NULL)
1533                 obd_zombie_impexp_notify();
1534 }
1535
1536 /**
1537  * Add import to the obd_zombe thread and notify it.
1538  */
1539 static void obd_zombie_import_add(struct obd_import *imp) {
1540         LASSERT(imp->imp_sec == NULL);
1541         cfs_spin_lock(&obd_zombie_impexp_lock);
1542         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1543         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1544         cfs_spin_unlock(&obd_zombie_impexp_lock);
1545
1546         if (obd_zombie_impexp_notify != NULL)
1547                 obd_zombie_impexp_notify();
1548 }
1549
1550 /**
1551  * notify import/export destroy thread about new zombie.
1552  */
1553 static void obd_zombie_impexp_notify(void)
1554 {
1555         cfs_waitq_signal(&obd_zombie_waitq);
1556 }
1557
1558 /**
1559  * check whether obd_zombie is idle
1560  */
1561 static int obd_zombie_is_idle(void)
1562 {
1563         int rc;
1564
1565         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1566         cfs_spin_lock(&obd_zombie_impexp_lock);
1567         rc = cfs_list_empty(&obd_zombie_imports) &&
1568              cfs_list_empty(&obd_zombie_exports);
1569         cfs_spin_unlock(&obd_zombie_impexp_lock);
1570         return rc;
1571 }
1572
1573 /**
1574  * wait when obd_zombie import/export queues become empty
1575  */
1576 void obd_zombie_barrier(void)
1577 {
1578         struct l_wait_info lwi = { 0 };
1579
1580         if (obd_zombie_pid == cfs_curproc_pid())
1581                 /* don't wait for myself */
1582                 return;
1583         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1584 }
1585 EXPORT_SYMBOL(obd_zombie_barrier);
1586
1587 #ifdef __KERNEL__
1588
1589 /**
1590  * destroy zombie export/import thread.
1591  */
1592 static int obd_zombie_impexp_thread(void *unused)
1593 {
1594         int rc;
1595
1596         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1597                 cfs_complete(&obd_zombie_start);
1598                 RETURN(rc);
1599         }
1600
1601         cfs_complete(&obd_zombie_start);
1602
1603         obd_zombie_pid = cfs_curproc_pid();
1604
1605         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1606                 struct l_wait_info lwi = { 0 };
1607
1608                 l_wait_event(obd_zombie_waitq,
1609                              !obd_zombie_impexp_check(NULL), &lwi);
1610                 obd_zombie_impexp_cull();
1611
1612                 /*
1613                  * Notify obd_zombie_barrier callers that queues
1614                  * may be empty.
1615                  */
1616                 cfs_waitq_signal(&obd_zombie_waitq);
1617         }
1618
1619         cfs_complete(&obd_zombie_stop);
1620
1621         RETURN(0);
1622 }
1623
1624 #else /* ! KERNEL */
1625
1626 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1627 static void *obd_zombie_impexp_work_cb;
1628 static void *obd_zombie_impexp_idle_cb;
1629
1630 int obd_zombie_impexp_kill(void *arg)
1631 {
1632         int rc = 0;
1633
1634         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1635                 obd_zombie_impexp_cull();
1636                 rc = 1;
1637         }
1638         cfs_atomic_dec(&zombie_recur);
1639         return rc;
1640 }
1641
1642 #endif
1643
1644 /**
1645  * start destroy zombie import/export thread
1646  */
1647 int obd_zombie_impexp_init(void)
1648 {
1649         int rc;
1650
1651         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1652         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1653         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1654         cfs_init_completion(&obd_zombie_start);
1655         cfs_init_completion(&obd_zombie_stop);
1656         cfs_waitq_init(&obd_zombie_waitq);
1657         obd_zombie_pid = 0;
1658
1659 #ifdef __KERNEL__
1660         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1661         if (rc < 0)
1662                 RETURN(rc);
1663
1664         cfs_wait_for_completion(&obd_zombie_start);
1665 #else
1666
1667         obd_zombie_impexp_work_cb =
1668                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1669                                                  &obd_zombie_impexp_kill, NULL);
1670
1671         obd_zombie_impexp_idle_cb =
1672                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1673                                                  &obd_zombie_impexp_check, NULL);
1674         rc = 0;
1675 #endif
1676         RETURN(rc);
1677 }
1678 /**
1679  * stop destroy zombie import/export thread
1680  */
1681 void obd_zombie_impexp_stop(void)
1682 {
1683         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1684         obd_zombie_impexp_notify();
1685 #ifdef __KERNEL__
1686         cfs_wait_for_completion(&obd_zombie_stop);
1687 #else
1688         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1689         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1690 #endif
1691 }
1692