Whamcloud - gitweb
b=15599 hsm infrastructure
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83 EXPORT_SYMBOL(obd_device_alloc);
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149
150 #define CLASS_MAX_NAME 1024
151
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153                         struct lprocfs_vars *vars, const char *name,
154                         struct lu_device_type *ldt)
155 {
156         struct obd_type *type;
157         int rc = 0;
158         ENTRY;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 RETURN(-EEXIST);
166         }
167
168         rc = -ENOMEM;
169         OBD_ALLOC(type, sizeof(*type));
170         if (type == NULL)
171                 RETURN(rc);
172
173         OBD_ALLOC_PTR(type->typ_dt_ops);
174         OBD_ALLOC_PTR(type->typ_md_ops);
175         OBD_ALLOC(type->typ_name, strlen(name) + 1);
176
177         if (type->typ_dt_ops == NULL ||
178             type->typ_md_ops == NULL ||
179             type->typ_name == NULL)
180                 GOTO (failed, rc);
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         cfs_spin_lock_init(&type->obd_type_lock);
188
189 #ifdef LPROCFS
190         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191                                               vars, type);
192         if (IS_ERR(type->typ_procroot)) {
193                 rc = PTR_ERR(type->typ_procroot);
194                 type->typ_procroot = NULL;
195                 GOTO (failed, rc);
196         }
197 #endif
198         if (ldt != NULL) {
199                 type->typ_lu = ldt;
200                 rc = lu_device_type_init(ldt);
201                 if (rc != 0)
202                         GOTO (failed, rc);
203         }
204
205         cfs_spin_lock(&obd_types_lock);
206         cfs_list_add(&type->typ_chain, &obd_types);
207         cfs_spin_unlock(&obd_types_lock);
208
209         RETURN (0);
210
211  failed:
212         if (type->typ_name != NULL)
213                 OBD_FREE(type->typ_name, strlen(name) + 1);
214         if (type->typ_md_ops != NULL)
215                 OBD_FREE_PTR(type->typ_md_ops);
216         if (type->typ_dt_ops != NULL)
217                 OBD_FREE_PTR(type->typ_dt_ops);
218         OBD_FREE(type, sizeof(*type));
219         RETURN(rc);
220 }
221
222 int class_unregister_type(const char *name)
223 {
224         struct obd_type *type = class_search_type(name);
225         ENTRY;
226
227         if (!type) {
228                 CERROR("unknown obd type\n");
229                 RETURN(-EINVAL);
230         }
231
232         if (type->typ_refcnt) {
233                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234                 /* This is a bad situation, let's make the best of it */
235                 /* Remove ops, but leave the name for debugging */
236                 OBD_FREE_PTR(type->typ_dt_ops);
237                 OBD_FREE_PTR(type->typ_md_ops);
238                 RETURN(-EBUSY);
239         }
240
241         if (type->typ_procroot) {
242                 lprocfs_remove(&type->typ_procroot);
243         }
244
245         if (type->typ_lu)
246                 lu_device_type_fini(type->typ_lu);
247
248         cfs_spin_lock(&obd_types_lock);
249         cfs_list_del(&type->typ_chain);
250         cfs_spin_unlock(&obd_types_lock);
251         OBD_FREE(type->typ_name, strlen(name) + 1);
252         if (type->typ_dt_ops != NULL)
253                 OBD_FREE_PTR(type->typ_dt_ops);
254         if (type->typ_md_ops != NULL)
255                 OBD_FREE_PTR(type->typ_md_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(0);
258 } /* class_unregister_type */
259
260 /**
261  * Create a new obd device.
262  *
263  * Find an empty slot in ::obd_devs[], create a new obd device in it.
264  *
265  * \param[in] type_name obd device type string.
266  * \param[in] name      obd device name.
267  *
268  * \retval NULL if create fails, otherwise return the obd device
269  *         pointer created.
270  */
271 struct obd_device *class_newdev(const char *type_name, const char *name)
272 {
273         struct obd_device *result = NULL;
274         struct obd_device *newdev;
275         struct obd_type *type = NULL;
276         int i;
277         int new_obd_minor = 0;
278
279         if (strlen(name) >= MAX_OBD_NAME) {
280                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281                 RETURN(ERR_PTR(-EINVAL));
282         }
283
284         type = class_get_type(type_name);
285         if (type == NULL){
286                 CERROR("OBD: unknown type: %s\n", type_name);
287                 RETURN(ERR_PTR(-ENODEV));
288         }
289
290         newdev = obd_device_alloc();
291         if (newdev == NULL) {
292                 class_put_type(type);
293                 RETURN(ERR_PTR(-ENOMEM));
294         }
295         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296
297         cfs_spin_lock(&obd_dev_lock);
298         for (i = 0; i < class_devno_max(); i++) {
299                 struct obd_device *obd = class_num2obd(i);
300                 if (obd && obd->obd_name &&
301                     (strcmp(name, obd->obd_name) == 0)) {
302                         CERROR("Device %s already exists, won't add\n", name);
303                         if (result) {
304                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305                                          "%p obd_magic %08x != %08x\n", result,
306                                          result->obd_magic, OBD_DEVICE_MAGIC);
307                                 LASSERTF(result->obd_minor == new_obd_minor,
308                                          "%p obd_minor %d != %d\n", result,
309                                          result->obd_minor, new_obd_minor);
310
311                                 obd_devs[result->obd_minor] = NULL;
312                                 result->obd_name[0]='\0';
313                          }
314                         result = ERR_PTR(-EEXIST);
315                         break;
316                 }
317                 if (!result && !obd) {
318                         result = newdev;
319                         result->obd_minor = i;
320                         new_obd_minor = i;
321                         result->obd_type = type;
322                         strncpy(result->obd_name, name,
323                                 sizeof(result->obd_name) - 1);
324                         obd_devs[i] = result;
325                 }
326         }
327         cfs_spin_unlock(&obd_dev_lock);
328
329         if (result == NULL && i >= class_devno_max()) {
330                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331                        class_devno_max());
332                 result = ERR_PTR(-EOVERFLOW);
333         }
334
335         if (IS_ERR(result)) {
336                 obd_device_free(newdev);
337                 class_put_type(type);
338         } else {
339                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340                        result->obd_name, result);
341         }
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type != NULL);
354
355         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356                obd->obd_name,obd->obd_type->typ_name);
357
358         cfs_spin_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         cfs_spin_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         cfs_spin_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377                         /* Make sure we finished attaching before we give
378                            out any references */
379                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380                         if (obd->obd_attached) {
381                                 cfs_spin_unlock(&obd_dev_lock);
382                                 return i;
383                         }
384                         break;
385                 }
386         }
387         cfs_spin_unlock(&obd_dev_lock);
388
389         return -1;
390 }
391
392 struct obd_device *class_name2obd(const char *name)
393 {
394         int dev = class_name2dev(name);
395
396         if (dev < 0 || dev > class_devno_max())
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 int class_uuid2dev(struct obd_uuid *uuid)
402 {
403         int i;
404
405         cfs_spin_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         cfs_spin_unlock(&obd_dev_lock);
411                         return i;
412                 }
413         }
414         cfs_spin_unlock(&obd_dev_lock);
415
416         return -1;
417 }
418
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 {
421         int dev = class_uuid2dev(uuid);
422         if (dev < 0)
423                 return NULL;
424         return class_num2obd(dev);
425 }
426
427 /**
428  * Get obd device from ::obd_devs[]
429  *
430  * \param num [in] array index
431  *
432  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433  *         otherwise return the obd device there.
434  */
435 struct obd_device *class_num2obd(int num)
436 {
437         struct obd_device *obd = NULL;
438
439         if (num < class_devno_max()) {
440                 obd = obd_devs[num];
441                 if (obd == NULL)
442                         return NULL;
443
444                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445                          "%p obd_magic %08x != %08x\n",
446                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447                 LASSERTF(obd->obd_minor == num,
448                          "%p obd_minor %0d != %0d\n",
449                          obd, obd->obd_minor, num);
450         }
451
452         return obd;
453 }
454
455 void class_obd_list(void)
456 {
457         char *status;
458         int i;
459
460         cfs_spin_lock(&obd_dev_lock);
461         for (i = 0; i < class_devno_max(); i++) {
462                 struct obd_device *obd = class_num2obd(i);
463                 if (obd == NULL)
464                         continue;
465                 if (obd->obd_stopping)
466                         status = "ST";
467                 else if (obd->obd_set_up)
468                         status = "UP";
469                 else if (obd->obd_attached)
470                         status = "AT";
471                 else
472                         status = "--";
473                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474                          i, status, obd->obd_type->typ_name,
475                          obd->obd_name, obd->obd_uuid.uuid,
476                          cfs_atomic_read(&obd->obd_refcount));
477         }
478         cfs_spin_unlock(&obd_dev_lock);
479         return;
480 }
481
482 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
483    specified, then only the client with that uuid is returned,
484    otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486                                           const char * typ_name,
487                                           struct obd_uuid *grp_uuid)
488 {
489         int i;
490
491         cfs_spin_lock(&obd_dev_lock);
492         for (i = 0; i < class_devno_max(); i++) {
493                 struct obd_device *obd = class_num2obd(i);
494                 if (obd == NULL)
495                         continue;
496                 if ((strncmp(obd->obd_type->typ_name, typ_name,
497                              strlen(typ_name)) == 0)) {
498                         if (obd_uuid_equals(tgt_uuid,
499                                             &obd->u.cli.cl_target_uuid) &&
500                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
501                                                          &obd->obd_uuid) : 1)) {
502                                 cfs_spin_unlock(&obd_dev_lock);
503                                 return obd;
504                         }
505                 }
506         }
507         cfs_spin_unlock(&obd_dev_lock);
508
509         return NULL;
510 }
511
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513    searching at *next, and if a device is found, the next index to look
514    at is saved in *next. If next is NULL, then the first matching device
515    will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
517 {
518         int i;
519
520         if (next == NULL)
521                 i = 0;
522         else if (*next >= 0 && *next < class_devno_max())
523                 i = *next;
524         else
525                 return NULL;
526
527         cfs_spin_lock(&obd_dev_lock);
528         for (; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530                 if (obd == NULL)
531                         continue;
532                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
533                         if (next != NULL)
534                                 *next = i+1;
535                         cfs_spin_unlock(&obd_dev_lock);
536                         return obd;
537                 }
538         }
539         cfs_spin_unlock(&obd_dev_lock);
540
541         return NULL;
542 }
543
544 /**
545  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546  * adjust sptlrpc settings accordingly.
547  */
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 {
550         struct obd_device  *obd;
551         const char         *type;
552         int                 i, rc = 0, rc2;
553
554         LASSERT(namelen > 0);
555
556         cfs_spin_lock(&obd_dev_lock);
557         for (i = 0; i < class_devno_max(); i++) {
558                 obd = class_num2obd(i);
559
560                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
561                         continue;
562
563                 /* only notify mdc, osc, mdt, ost */
564                 type = obd->obd_type->typ_name;
565                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OST_NAME) != 0)
569                         continue;
570
571                 if (strncmp(obd->obd_name, fsname, namelen))
572                         continue;
573
574                 class_incref(obd, __FUNCTION__, obd);
575                 cfs_spin_unlock(&obd_dev_lock);
576                 rc2 = obd_set_info_async(obd->obd_self_export,
577                                          sizeof(KEY_SPTLRPC_CONF),
578                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
579                 rc = rc ? rc : rc2;
580                 class_decref(obd, __FUNCTION__, obd);
581                 cfs_spin_lock(&obd_dev_lock);
582         }
583         cfs_spin_unlock(&obd_dev_lock);
584         return rc;
585 }
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587
588 void obd_cleanup_caches(void)
589 {
590         int rc;
591
592         ENTRY;
593         if (obd_device_cachep) {
594                 rc = cfs_mem_cache_destroy(obd_device_cachep);
595                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596                 obd_device_cachep = NULL;
597         }
598         if (obdo_cachep) {
599                 rc = cfs_mem_cache_destroy(obdo_cachep);
600                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601                 obdo_cachep = NULL;
602         }
603         if (import_cachep) {
604                 rc = cfs_mem_cache_destroy(import_cachep);
605                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606                 import_cachep = NULL;
607         }
608         if (capa_cachep) {
609                 rc = cfs_mem_cache_destroy(capa_cachep);
610                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
611                 capa_cachep = NULL;
612         }
613         EXIT;
614 }
615
616 int obd_init_caches(void)
617 {
618         ENTRY;
619
620         LASSERT(obd_device_cachep == NULL);
621         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622                                                  sizeof(struct obd_device),
623                                                  0, 0);
624         if (!obd_device_cachep)
625                 GOTO(out, -ENOMEM);
626
627         LASSERT(obdo_cachep == NULL);
628         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
629                                            0, 0);
630         if (!obdo_cachep)
631                 GOTO(out, -ENOMEM);
632
633         LASSERT(import_cachep == NULL);
634         import_cachep = cfs_mem_cache_create("ll_import_cache",
635                                              sizeof(struct obd_import),
636                                              0, 0);
637         if (!import_cachep)
638                 GOTO(out, -ENOMEM);
639
640         LASSERT(capa_cachep == NULL);
641         capa_cachep = cfs_mem_cache_create("capa_cache",
642                                            sizeof(struct obd_capa), 0, 0);
643         if (!capa_cachep)
644                 GOTO(out, -ENOMEM);
645
646         RETURN(0);
647  out:
648         obd_cleanup_caches();
649         RETURN(-ENOMEM);
650
651 }
652
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 {
656         struct obd_export *export;
657         ENTRY;
658
659         if (!conn) {
660                 CDEBUG(D_CACHE, "looking for null handle\n");
661                 RETURN(NULL);
662         }
663
664         if (conn->cookie == -1) {  /* this means assign a new connection */
665                 CDEBUG(D_CACHE, "want a new connection\n");
666                 RETURN(NULL);
667         }
668
669         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670         export = class_handle2object(conn->cookie);
671         RETURN(export);
672 }
673
674 struct obd_device *class_exp2obd(struct obd_export *exp)
675 {
676         if (exp)
677                 return exp->exp_obd;
678         return NULL;
679 }
680
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 {
683         struct obd_export *export;
684         export = class_conn2export(conn);
685         if (export) {
686                 struct obd_device *obd = export->exp_obd;
687                 class_export_put(export);
688                 return obd;
689         }
690         return NULL;
691 }
692
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 {
695         struct obd_device *obd = exp->exp_obd;
696         if (obd == NULL)
697                 return NULL;
698         return obd->u.cli.cl_import;
699 }
700
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 {
703         struct obd_device *obd = class_conn2obd(conn);
704         if (obd == NULL)
705                 return NULL;
706         return obd->u.cli.cl_import;
707 }
708
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
711 {
712         struct obd_device *obd = exp->exp_obd;
713         ENTRY;
714
715         LASSERT (cfs_atomic_read(&exp->exp_refcount) == 0);
716
717         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718                exp->exp_client_uuid.uuid, obd->obd_name);
719
720         LASSERT(obd != NULL);
721
722         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723         if (exp->exp_connection)
724                 ptlrpc_put_connection_superhack(exp->exp_connection);
725
726         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
727         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
728         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
729         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
730         obd_destroy_export(exp);
731         class_decref(obd, "export", exp);
732
733         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
734         EXIT;
735 }
736
737 static void export_handle_addref(void *export)
738 {
739         class_export_get(export);
740 }
741
742 struct obd_export *class_export_get(struct obd_export *exp)
743 {
744         cfs_atomic_inc(&exp->exp_refcount);
745         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746                cfs_atomic_read(&exp->exp_refcount));
747         return exp;
748 }
749 EXPORT_SYMBOL(class_export_get);
750
751 void class_export_put(struct obd_export *exp)
752 {
753         LASSERT(exp != NULL);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                cfs_atomic_read(&exp->exp_refcount) - 1);
756         LASSERT(cfs_atomic_read(&exp->exp_refcount) > 0);
757         LASSERT(cfs_atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758
759         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
760                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
761                 CDEBUG(D_IOCTL, "final put %p/%s\n",
762                        exp, exp->exp_client_uuid.uuid);
763
764                 /* release nid stat refererence */
765                 lprocfs_exp_cleanup(exp);
766
767                 obd_zombie_export_add(exp);
768         }
769 }
770 EXPORT_SYMBOL(class_export_put);
771
772 /* Creates a new export, adds it to the hash table, and returns a
773  * pointer to it. The refcount is 2: one for the hash reference, and
774  * one for the pointer returned by this function. */
775 struct obd_export *class_new_export(struct obd_device *obd,
776                                     struct obd_uuid *cluuid)
777 {
778         struct obd_export *export;
779         cfs_hash_t *hash = NULL;
780         int rc = 0;
781         ENTRY;
782
783         OBD_ALLOC_PTR(export);
784         if (!export)
785                 return ERR_PTR(-ENOMEM);
786
787         export->exp_conn_cnt = 0;
788         export->exp_lock_hash = NULL;
789         cfs_atomic_set(&export->exp_refcount, 2);
790         cfs_atomic_set(&export->exp_rpc_count, 0);
791         cfs_atomic_set(&export->exp_cb_count, 0);
792         cfs_atomic_set(&export->exp_locks_count, 0);
793 #if LUSTRE_TRACKS_LOCK_EXP_REFS
794         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
795         cfs_spin_lock_init(&export->exp_locks_list_guard);
796 #endif
797         cfs_atomic_set(&export->exp_replay_count, 0);
798         export->exp_obd = obd;
799         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
800         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
801         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
802         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
803         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
804         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
805         class_handle_hash(&export->exp_handle, export_handle_addref);
806         export->exp_last_request_time = cfs_time_current_sec();
807         cfs_spin_lock_init(&export->exp_lock);
808         cfs_spin_lock_init(&export->exp_rpc_lock);
809         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
810         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
811
812         export->exp_sp_peer = LUSTRE_SP_ANY;
813         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
814         export->exp_client_uuid = *cluuid;
815         obd_init_export(export);
816
817         cfs_spin_lock(&obd->obd_dev_lock);
818          /* shouldn't happen, but might race */
819         if (obd->obd_stopping)
820                 GOTO(exit_unlock, rc = -ENODEV);
821
822         hash = cfs_hash_getref(obd->obd_uuid_hash);
823         if (hash == NULL)
824                 GOTO(exit_unlock, rc = -ENODEV);
825         cfs_spin_unlock(&obd->obd_dev_lock);
826
827         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
828                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
829                 if (rc != 0) {
830                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
831                                       obd->obd_name, cluuid->uuid, rc);
832                         GOTO(exit_err, rc = -EALREADY);
833                 }
834         }
835
836         cfs_spin_lock(&obd->obd_dev_lock);
837         if (obd->obd_stopping) {
838                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
839                 GOTO(exit_unlock, rc = -ENODEV);
840         }
841
842         class_incref(obd, "export", export);
843         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
844         cfs_list_add_tail(&export->exp_obd_chain_timed,
845                           &export->exp_obd->obd_exports_timed);
846         export->exp_obd->obd_num_exports++;
847         cfs_spin_unlock(&obd->obd_dev_lock);
848         cfs_hash_putref(hash);
849         RETURN(export);
850
851 exit_unlock:
852         cfs_spin_unlock(&obd->obd_dev_lock);
853 exit_err:
854         if (hash)
855                 cfs_hash_putref(hash);
856         class_handle_unhash(&export->exp_handle);
857         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
858         obd_destroy_export(export);
859         OBD_FREE_PTR(export);
860         return ERR_PTR(rc);
861 }
862 EXPORT_SYMBOL(class_new_export);
863
864 void class_unlink_export(struct obd_export *exp)
865 {
866         class_handle_unhash(&exp->exp_handle);
867
868         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
869         /* delete an uuid-export hashitem from hashtables */
870         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
871                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
872                              &exp->exp_client_uuid,
873                              &exp->exp_uuid_hash);
874
875         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
876         cfs_list_del_init(&exp->exp_obd_chain_timed);
877         exp->exp_obd->obd_num_exports--;
878         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
879         class_export_put(exp);
880 }
881 EXPORT_SYMBOL(class_unlink_export);
882
883 /* Import management functions */
884 void class_import_destroy(struct obd_import *imp)
885 {
886         ENTRY;
887
888         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
889                 imp->imp_obd->obd_name);
890
891         LASSERT(cfs_atomic_read(&imp->imp_refcount) == 0);
892
893         ptlrpc_put_connection_superhack(imp->imp_connection);
894
895         while (!cfs_list_empty(&imp->imp_conn_list)) {
896                 struct obd_import_conn *imp_conn;
897
898                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
899                                           struct obd_import_conn, oic_item);
900                 cfs_list_del_init(&imp_conn->oic_item);
901                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
902                 OBD_FREE(imp_conn, sizeof(*imp_conn));
903         }
904
905         LASSERT(imp->imp_sec == NULL);
906         class_decref(imp->imp_obd, "import", imp);
907         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
908         EXIT;
909 }
910
911 static void import_handle_addref(void *import)
912 {
913         class_import_get(import);
914 }
915
916 struct obd_import *class_import_get(struct obd_import *import)
917 {
918         LASSERT(cfs_atomic_read(&import->imp_refcount) >= 0);
919         LASSERT(cfs_atomic_read(&import->imp_refcount) < 0x5a5a5a);
920         cfs_atomic_inc(&import->imp_refcount);
921         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
922                cfs_atomic_read(&import->imp_refcount),
923                import->imp_obd->obd_name);
924         return import;
925 }
926 EXPORT_SYMBOL(class_import_get);
927
928 void class_import_put(struct obd_import *imp)
929 {
930         ENTRY;
931
932         LASSERT(cfs_atomic_read(&imp->imp_refcount) > 0);
933         LASSERT(cfs_atomic_read(&imp->imp_refcount) < 0x5a5a5a);
934         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
935
936         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
937                cfs_atomic_read(&imp->imp_refcount) - 1,
938                imp->imp_obd->obd_name);
939
940         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
941                 CDEBUG(D_INFO, "final put import %p\n", imp);
942                 obd_zombie_import_add(imp);
943         }
944
945         EXIT;
946 }
947 EXPORT_SYMBOL(class_import_put);
948
949 static void init_imp_at(struct imp_at *at) {
950         int i;
951         at_init(&at->iat_net_latency, 0, 0);
952         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
953                 /* max service estimates are tracked on the server side, so
954                    don't use the AT history here, just use the last reported
955                    val. (But keep hist for proc histogram, worst_ever) */
956                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
957                         AT_FLG_NOHIST);
958         }
959 }
960
961 struct obd_import *class_new_import(struct obd_device *obd)
962 {
963         struct obd_import *imp;
964
965         OBD_ALLOC(imp, sizeof(*imp));
966         if (imp == NULL)
967                 return NULL;
968
969         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
970         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
971         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
972         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
973         cfs_spin_lock_init(&imp->imp_lock);
974         imp->imp_last_success_conn = 0;
975         imp->imp_state = LUSTRE_IMP_NEW;
976         imp->imp_obd = class_incref(obd, "import", imp);
977         cfs_sema_init(&imp->imp_sec_mutex, 1);
978         cfs_waitq_init(&imp->imp_recovery_waitq);
979
980         cfs_atomic_set(&imp->imp_refcount, 2);
981         cfs_atomic_set(&imp->imp_unregistering, 0);
982         cfs_atomic_set(&imp->imp_inflight, 0);
983         cfs_atomic_set(&imp->imp_replay_inflight, 0);
984         cfs_atomic_set(&imp->imp_inval_count, 0);
985         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
986         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
987         class_handle_hash(&imp->imp_handle, import_handle_addref);
988         init_imp_at(&imp->imp_at);
989
990         /* the default magic is V2, will be used in connect RPC, and
991          * then adjusted according to the flags in request/reply. */
992         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
993
994         return imp;
995 }
996 EXPORT_SYMBOL(class_new_import);
997
998 void class_destroy_import(struct obd_import *import)
999 {
1000         LASSERT(import != NULL);
1001         LASSERT(import != LP_POISON);
1002
1003         class_handle_unhash(&import->imp_handle);
1004
1005         cfs_spin_lock(&import->imp_lock);
1006         import->imp_generation++;
1007         cfs_spin_unlock(&import->imp_lock);
1008         class_import_put(import);
1009 }
1010 EXPORT_SYMBOL(class_destroy_import);
1011
1012 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1013
1014 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1015 {
1016         cfs_spin_lock(&exp->exp_locks_list_guard);
1017
1018         LASSERT(lock->l_exp_refs_nr >= 0);
1019
1020         if (lock->l_exp_refs_target != NULL &&
1021             lock->l_exp_refs_target != exp) {
1022                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1023                               exp, lock, lock->l_exp_refs_target);
1024         }
1025         if ((lock->l_exp_refs_nr ++) == 0) {
1026                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1027                 lock->l_exp_refs_target = exp;
1028         }
1029         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1030                lock, exp, lock->l_exp_refs_nr);
1031         cfs_spin_unlock(&exp->exp_locks_list_guard);
1032 }
1033 EXPORT_SYMBOL(__class_export_add_lock_ref);
1034
1035 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1036 {
1037         cfs_spin_lock(&exp->exp_locks_list_guard);
1038         LASSERT(lock->l_exp_refs_nr > 0);
1039         if (lock->l_exp_refs_target != exp) {
1040                 LCONSOLE_WARN("lock %p, "
1041                               "mismatching export pointers: %p, %p\n",
1042                               lock, lock->l_exp_refs_target, exp);
1043         }
1044         if (-- lock->l_exp_refs_nr == 0) {
1045                 cfs_list_del_init(&lock->l_exp_refs_link);
1046                 lock->l_exp_refs_target = NULL;
1047         }
1048         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1049                lock, exp, lock->l_exp_refs_nr);
1050         cfs_spin_unlock(&exp->exp_locks_list_guard);
1051 }
1052 EXPORT_SYMBOL(__class_export_del_lock_ref);
1053 #endif
1054
1055 /* A connection defines an export context in which preallocation can
1056    be managed. This releases the export pointer reference, and returns
1057    the export handle, so the export refcount is 1 when this function
1058    returns. */
1059 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1060                   struct obd_uuid *cluuid)
1061 {
1062         struct obd_export *export;
1063         LASSERT(conn != NULL);
1064         LASSERT(obd != NULL);
1065         LASSERT(cluuid != NULL);
1066         ENTRY;
1067
1068         export = class_new_export(obd, cluuid);
1069         if (IS_ERR(export))
1070                 RETURN(PTR_ERR(export));
1071
1072         conn->cookie = export->exp_handle.h_cookie;
1073         class_export_put(export);
1074
1075         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1076                cluuid->uuid, conn->cookie);
1077         RETURN(0);
1078 }
1079 EXPORT_SYMBOL(class_connect);
1080
1081 /* if export is involved in recovery then clean up related things */
1082 void class_export_recovery_cleanup(struct obd_export *exp)
1083 {
1084         struct obd_device *obd = exp->exp_obd;
1085
1086         cfs_spin_lock(&obd->obd_recovery_task_lock);
1087         if (exp->exp_delayed)
1088                 obd->obd_delayed_clients--;
1089         if (obd->obd_recovering && exp->exp_in_recovery) {
1090                 cfs_spin_lock(&exp->exp_lock);
1091                 exp->exp_in_recovery = 0;
1092                 cfs_spin_unlock(&exp->exp_lock);
1093                 LASSERT(obd->obd_connected_clients);
1094                 obd->obd_connected_clients--;
1095         }
1096         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1097         /** Cleanup req replay fields */
1098         if (exp->exp_req_replay_needed) {
1099                 cfs_spin_lock(&exp->exp_lock);
1100                 exp->exp_req_replay_needed = 0;
1101                 cfs_spin_unlock(&exp->exp_lock);
1102                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1103                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1104         }
1105         /** Cleanup lock replay data */
1106         if (exp->exp_lock_replay_needed) {
1107                 cfs_spin_lock(&exp->exp_lock);
1108                 exp->exp_lock_replay_needed = 0;
1109                 cfs_spin_unlock(&exp->exp_lock);
1110                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1111                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1112         }
1113 }
1114
1115 /* This function removes 1-3 references from the export:
1116  * 1 - for export pointer passed
1117  * and if disconnect really need
1118  * 2 - removing from hash
1119  * 3 - in client_unlink_export
1120  * The export pointer passed to this function can destroyed */
1121 int class_disconnect(struct obd_export *export)
1122 {
1123         int already_disconnected;
1124         ENTRY;
1125
1126         if (export == NULL) {
1127                 fixme();
1128                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1129                 RETURN(-EINVAL);
1130         }
1131
1132         cfs_spin_lock(&export->exp_lock);
1133         already_disconnected = export->exp_disconnected;
1134         export->exp_disconnected = 1;
1135         cfs_spin_unlock(&export->exp_lock);
1136
1137         /* class_cleanup(), abort_recovery(), and class_fail_export()
1138          * all end up in here, and if any of them race we shouldn't
1139          * call extra class_export_puts(). */
1140         if (already_disconnected) {
1141                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1142                 GOTO(no_disconn, already_disconnected);
1143         }
1144
1145         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1146                export->exp_handle.h_cookie);
1147
1148         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1149                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1150                              &export->exp_connection->c_peer.nid,
1151                              &export->exp_nid_hash);
1152
1153         class_export_recovery_cleanup(export);
1154         class_unlink_export(export);
1155 no_disconn:
1156         class_export_put(export);
1157         RETURN(0);
1158 }
1159
1160 /* Return non-zero for a fully connected export */
1161 int class_connected_export(struct obd_export *exp)
1162 {
1163         if (exp) {
1164                 int connected;
1165                 cfs_spin_lock(&exp->exp_lock);
1166                 connected = (exp->exp_conn_cnt > 0);
1167                 cfs_spin_unlock(&exp->exp_lock);
1168                 return connected;
1169         }
1170         return 0;
1171 }
1172 EXPORT_SYMBOL(class_connected_export);
1173
1174 static void class_disconnect_export_list(cfs_list_t *list,
1175                                          enum obd_option flags)
1176 {
1177         int rc;
1178         struct obd_export *exp;
1179         ENTRY;
1180
1181         /* It's possible that an export may disconnect itself, but
1182          * nothing else will be added to this list. */
1183         while (!cfs_list_empty(list)) {
1184                 exp = cfs_list_entry(list->next, struct obd_export,
1185                                      exp_obd_chain);
1186                 /* need for safe call CDEBUG after obd_disconnect */
1187                 class_export_get(exp);
1188
1189                 cfs_spin_lock(&exp->exp_lock);
1190                 exp->exp_flags = flags;
1191                 cfs_spin_unlock(&exp->exp_lock);
1192
1193                 if (obd_uuid_equals(&exp->exp_client_uuid,
1194                                     &exp->exp_obd->obd_uuid)) {
1195                         CDEBUG(D_HA,
1196                                "exp %p export uuid == obd uuid, don't discon\n",
1197                                exp);
1198                         /* Need to delete this now so we don't end up pointing
1199                          * to work_list later when this export is cleaned up. */
1200                         cfs_list_del_init(&exp->exp_obd_chain);
1201                         class_export_put(exp);
1202                         continue;
1203                 }
1204
1205                 class_export_get(exp);
1206                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1207                        "last request at "CFS_TIME_T"\n",
1208                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1209                        exp, exp->exp_last_request_time);
1210                 /* release one export reference anyway */
1211                 rc = obd_disconnect(exp);
1212
1213                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1214                        obd_export_nid2str(exp), exp, rc);
1215                 class_export_put(exp);
1216         }
1217         EXIT;
1218 }
1219
1220 void class_disconnect_exports(struct obd_device *obd)
1221 {
1222         cfs_list_t work_list;
1223         ENTRY;
1224
1225         /* Move all of the exports from obd_exports to a work list, en masse. */
1226         CFS_INIT_LIST_HEAD(&work_list);
1227         cfs_spin_lock(&obd->obd_dev_lock);
1228         cfs_list_splice_init(&obd->obd_exports, &work_list);
1229         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1230         cfs_spin_unlock(&obd->obd_dev_lock);
1231
1232         if (!cfs_list_empty(&work_list)) {
1233                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1234                        "disconnecting them\n", obd->obd_minor, obd);
1235                 class_disconnect_export_list(&work_list,
1236                                              exp_flags_from_obd(obd));
1237         } else
1238                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1239                        obd->obd_minor, obd);
1240         EXIT;
1241 }
1242 EXPORT_SYMBOL(class_disconnect_exports);
1243
1244 /* Remove exports that have not completed recovery.
1245  */
1246 void class_disconnect_stale_exports(struct obd_device *obd,
1247                                     int (*test_export)(struct obd_export *))
1248 {
1249         cfs_list_t work_list;
1250         cfs_list_t *pos, *n;
1251         struct obd_export *exp;
1252         int evicted = 0;
1253         ENTRY;
1254
1255         CFS_INIT_LIST_HEAD(&work_list);
1256         cfs_spin_lock(&obd->obd_dev_lock);
1257         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1258                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1259                 if (test_export(exp))
1260                         continue;
1261
1262                 /* don't count self-export as client */
1263                 if (obd_uuid_equals(&exp->exp_client_uuid,
1264                                     &exp->exp_obd->obd_uuid))
1265                         continue;
1266
1267                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1268                 evicted++;
1269                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1270                        obd->obd_name, exp->exp_client_uuid.uuid,
1271                        exp->exp_connection == NULL ? "<unknown>" :
1272                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1273                 print_export_data(exp, "EVICTING", 0);
1274         }
1275         cfs_spin_unlock(&obd->obd_dev_lock);
1276
1277         if (evicted) {
1278                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1279                        obd->obd_name, evicted);
1280                 obd->obd_stale_clients += evicted;
1281         }
1282         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1283                                                  OBD_OPT_ABORT_RECOV);
1284         EXIT;
1285 }
1286 EXPORT_SYMBOL(class_disconnect_stale_exports);
1287
1288 void class_fail_export(struct obd_export *exp)
1289 {
1290         int rc, already_failed;
1291
1292         cfs_spin_lock(&exp->exp_lock);
1293         already_failed = exp->exp_failed;
1294         exp->exp_failed = 1;
1295         cfs_spin_unlock(&exp->exp_lock);
1296
1297         if (already_failed) {
1298                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1299                        exp, exp->exp_client_uuid.uuid);
1300                 return;
1301         }
1302
1303         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1304                exp, exp->exp_client_uuid.uuid);
1305
1306         if (obd_dump_on_timeout)
1307                 libcfs_debug_dumplog();
1308
1309         /* Most callers into obd_disconnect are removing their own reference
1310          * (request, for example) in addition to the one from the hash table.
1311          * We don't have such a reference here, so make one. */
1312         class_export_get(exp);
1313         rc = obd_disconnect(exp);
1314         if (rc)
1315                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1316         else
1317                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1318                        exp, exp->exp_client_uuid.uuid);
1319 }
1320 EXPORT_SYMBOL(class_fail_export);
1321
1322 char *obd_export_nid2str(struct obd_export *exp)
1323 {
1324         if (exp->exp_connection != NULL)
1325                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1326
1327         return "(no nid)";
1328 }
1329 EXPORT_SYMBOL(obd_export_nid2str);
1330
1331 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1332 {
1333         struct obd_export *doomed_exp = NULL;
1334         int exports_evicted = 0;
1335
1336         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1337
1338         do {
1339                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1340                 if (doomed_exp == NULL)
1341                         break;
1342
1343                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1344                          "nid %s found, wanted nid %s, requested nid %s\n",
1345                          obd_export_nid2str(doomed_exp),
1346                          libcfs_nid2str(nid_key), nid);
1347                 LASSERTF(doomed_exp != obd->obd_self_export,
1348                          "self-export is hashed by NID?\n");
1349                 exports_evicted++;
1350                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1351                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1352                        exports_evicted);
1353                 class_fail_export(doomed_exp);
1354                 class_export_put(doomed_exp);
1355         } while (1);
1356
1357         if (!exports_evicted)
1358                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1359                        obd->obd_name, nid);
1360         return exports_evicted;
1361 }
1362 EXPORT_SYMBOL(obd_export_evict_by_nid);
1363
1364 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1365 {
1366         struct obd_export *doomed_exp = NULL;
1367         struct obd_uuid doomed_uuid;
1368         int exports_evicted = 0;
1369
1370         obd_str2uuid(&doomed_uuid, uuid);
1371         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1372                 CERROR("%s: can't evict myself\n", obd->obd_name);
1373                 return exports_evicted;
1374         }
1375
1376         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1377
1378         if (doomed_exp == NULL) {
1379                 CERROR("%s: can't disconnect %s: no exports found\n",
1380                        obd->obd_name, uuid);
1381         } else {
1382                 CWARN("%s: evicting %s at adminstrative request\n",
1383                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1384                 class_fail_export(doomed_exp);
1385                 class_export_put(doomed_exp);
1386                 exports_evicted++;
1387         }
1388
1389         return exports_evicted;
1390 }
1391 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1392
1393 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1394 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1395 EXPORT_SYMBOL(class_export_dump_hook);
1396 #endif
1397
1398 static void print_export_data(struct obd_export *exp, const char *status,
1399                               int locks)
1400 {
1401         struct ptlrpc_reply_state *rs;
1402         struct ptlrpc_reply_state *first_reply = NULL;
1403         int nreplies = 0;
1404
1405         cfs_spin_lock(&exp->exp_lock);
1406         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1407                                 rs_exp_list) {
1408                 if (nreplies == 0)
1409                         first_reply = rs;
1410                 nreplies++;
1411         }
1412         cfs_spin_unlock(&exp->exp_lock);
1413
1414         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1415                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1416                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1417                cfs_atomic_read(&exp->exp_rpc_count),
1418                cfs_atomic_read(&exp->exp_cb_count),
1419                cfs_atomic_read(&exp->exp_locks_count),
1420                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1421                nreplies, first_reply, nreplies > 3 ? "..." : "",
1422                exp->exp_last_committed);
1423 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1424         if (locks && class_export_dump_hook != NULL)
1425                 class_export_dump_hook(exp);
1426 #endif
1427 }
1428
1429 void dump_exports(struct obd_device *obd, int locks)
1430 {
1431         struct obd_export *exp;
1432
1433         cfs_spin_lock(&obd->obd_dev_lock);
1434         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1435                 print_export_data(exp, "ACTIVE", locks);
1436         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1437                 print_export_data(exp, "UNLINKED", locks);
1438         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1439                 print_export_data(exp, "DELAYED", locks);
1440         cfs_spin_unlock(&obd->obd_dev_lock);
1441         cfs_spin_lock(&obd_zombie_impexp_lock);
1442         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1443                 print_export_data(exp, "ZOMBIE", locks);
1444         cfs_spin_unlock(&obd_zombie_impexp_lock);
1445 }
1446 EXPORT_SYMBOL(dump_exports);
1447
1448 void obd_exports_barrier(struct obd_device *obd)
1449 {
1450         int waited = 2;
1451         LASSERT(cfs_list_empty(&obd->obd_exports));
1452         cfs_spin_lock(&obd->obd_dev_lock);
1453         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1454                 cfs_spin_unlock(&obd->obd_dev_lock);
1455                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1456                                                    cfs_time_seconds(waited));
1457                 if (waited > 5 && IS_PO2(waited)) {
1458                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1459                                       "more than %d seconds. "
1460                                       "The obd refcount = %d. Is it stuck?\n",
1461                                       obd->obd_name, waited,
1462                                       cfs_atomic_read(&obd->obd_refcount));
1463                         dump_exports(obd, 1);
1464                 }
1465                 waited *= 2;
1466                 cfs_spin_lock(&obd->obd_dev_lock);
1467         }
1468         cfs_spin_unlock(&obd->obd_dev_lock);
1469 }
1470 EXPORT_SYMBOL(obd_exports_barrier);
1471
1472 /**
1473  * kill zombie imports and exports
1474  */
1475 void obd_zombie_impexp_cull(void)
1476 {
1477         struct obd_import *import;
1478         struct obd_export *export;
1479         ENTRY;
1480
1481         do {
1482                 cfs_spin_lock(&obd_zombie_impexp_lock);
1483
1484                 import = NULL;
1485                 if (!cfs_list_empty(&obd_zombie_imports)) {
1486                         import = cfs_list_entry(obd_zombie_imports.next,
1487                                                 struct obd_import,
1488                                                 imp_zombie_chain);
1489                         cfs_list_del_init(&import->imp_zombie_chain);
1490                 }
1491
1492                 export = NULL;
1493                 if (!cfs_list_empty(&obd_zombie_exports)) {
1494                         export = cfs_list_entry(obd_zombie_exports.next,
1495                                                 struct obd_export,
1496                                                 exp_obd_chain);
1497                         cfs_list_del_init(&export->exp_obd_chain);
1498                 }
1499
1500                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1501
1502                 if (import != NULL)
1503                         class_import_destroy(import);
1504
1505                 if (export != NULL)
1506                         class_export_destroy(export);
1507
1508                 cfs_cond_resched();
1509         } while (import != NULL || export != NULL);
1510         EXIT;
1511 }
1512
1513 static cfs_completion_t         obd_zombie_start;
1514 static cfs_completion_t         obd_zombie_stop;
1515 static unsigned long            obd_zombie_flags;
1516 static cfs_waitq_t              obd_zombie_waitq;
1517 static pid_t                    obd_zombie_pid;
1518
1519 enum {
1520         OBD_ZOMBIE_STOP   = 1 << 1
1521 };
1522
1523 /**
1524  * check for work for kill zombie import/export thread.
1525  */
1526 static int obd_zombie_impexp_check(void *arg)
1527 {
1528         int rc;
1529
1530         cfs_spin_lock(&obd_zombie_impexp_lock);
1531         rc = cfs_list_empty(&obd_zombie_imports) &&
1532              cfs_list_empty(&obd_zombie_exports) &&
1533              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1534
1535         cfs_spin_unlock(&obd_zombie_impexp_lock);
1536
1537         RETURN(rc);
1538 }
1539
1540 /**
1541  * Add export to the obd_zombe thread and notify it.
1542  */
1543 static void obd_zombie_export_add(struct obd_export *exp) {
1544         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1545         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1546         cfs_list_del_init(&exp->exp_obd_chain);
1547         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1548         cfs_spin_lock(&obd_zombie_impexp_lock);
1549         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1550         cfs_spin_unlock(&obd_zombie_impexp_lock);
1551
1552         if (obd_zombie_impexp_notify != NULL)
1553                 obd_zombie_impexp_notify();
1554 }
1555
1556 /**
1557  * Add import to the obd_zombe thread and notify it.
1558  */
1559 static void obd_zombie_import_add(struct obd_import *imp) {
1560         LASSERT(imp->imp_sec == NULL);
1561         cfs_spin_lock(&obd_zombie_impexp_lock);
1562         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1563         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1564         cfs_spin_unlock(&obd_zombie_impexp_lock);
1565
1566         if (obd_zombie_impexp_notify != NULL)
1567                 obd_zombie_impexp_notify();
1568 }
1569
1570 /**
1571  * notify import/export destroy thread about new zombie.
1572  */
1573 static void obd_zombie_impexp_notify(void)
1574 {
1575         cfs_waitq_signal(&obd_zombie_waitq);
1576 }
1577
1578 /**
1579  * check whether obd_zombie is idle
1580  */
1581 static int obd_zombie_is_idle(void)
1582 {
1583         int rc;
1584
1585         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1586         cfs_spin_lock(&obd_zombie_impexp_lock);
1587         rc = cfs_list_empty(&obd_zombie_imports) &&
1588              cfs_list_empty(&obd_zombie_exports);
1589         cfs_spin_unlock(&obd_zombie_impexp_lock);
1590         return rc;
1591 }
1592
1593 /**
1594  * wait when obd_zombie import/export queues become empty
1595  */
1596 void obd_zombie_barrier(void)
1597 {
1598         struct l_wait_info lwi = { 0 };
1599
1600         if (obd_zombie_pid == cfs_curproc_pid())
1601                 /* don't wait for myself */
1602                 return;
1603         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1604 }
1605 EXPORT_SYMBOL(obd_zombie_barrier);
1606
1607 #ifdef __KERNEL__
1608
1609 /**
1610  * destroy zombie export/import thread.
1611  */
1612 static int obd_zombie_impexp_thread(void *unused)
1613 {
1614         int rc;
1615
1616         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1617                 cfs_complete(&obd_zombie_start);
1618                 RETURN(rc);
1619         }
1620
1621         cfs_complete(&obd_zombie_start);
1622
1623         obd_zombie_pid = cfs_curproc_pid();
1624
1625         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1626                 struct l_wait_info lwi = { 0 };
1627
1628                 l_wait_event(obd_zombie_waitq,
1629                              !obd_zombie_impexp_check(NULL), &lwi);
1630                 obd_zombie_impexp_cull();
1631
1632                 /*
1633                  * Notify obd_zombie_barrier callers that queues
1634                  * may be empty.
1635                  */
1636                 cfs_waitq_signal(&obd_zombie_waitq);
1637         }
1638
1639         cfs_complete(&obd_zombie_stop);
1640
1641         RETURN(0);
1642 }
1643
1644 #else /* ! KERNEL */
1645
1646 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1647 static void *obd_zombie_impexp_work_cb;
1648 static void *obd_zombie_impexp_idle_cb;
1649
1650 int obd_zombie_impexp_kill(void *arg)
1651 {
1652         int rc = 0;
1653
1654         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1655                 obd_zombie_impexp_cull();
1656                 rc = 1;
1657         }
1658         cfs_atomic_dec(&zombie_recur);
1659         return rc;
1660 }
1661
1662 #endif
1663
1664 /**
1665  * start destroy zombie import/export thread
1666  */
1667 int obd_zombie_impexp_init(void)
1668 {
1669         int rc;
1670
1671         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1672         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1673         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1674         cfs_init_completion(&obd_zombie_start);
1675         cfs_init_completion(&obd_zombie_stop);
1676         cfs_waitq_init(&obd_zombie_waitq);
1677         obd_zombie_pid = 0;
1678
1679 #ifdef __KERNEL__
1680         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1681         if (rc < 0)
1682                 RETURN(rc);
1683
1684         cfs_wait_for_completion(&obd_zombie_start);
1685 #else
1686
1687         obd_zombie_impexp_work_cb =
1688                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1689                                                  &obd_zombie_impexp_kill, NULL);
1690
1691         obd_zombie_impexp_idle_cb =
1692                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1693                                                  &obd_zombie_impexp_check, NULL);
1694         rc = 0;
1695 #endif
1696         RETURN(rc);
1697 }
1698 /**
1699  * stop destroy zombie import/export thread
1700  */
1701 void obd_zombie_impexp_stop(void)
1702 {
1703         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1704         obd_zombie_impexp_notify();
1705 #ifdef __KERNEL__
1706         cfs_wait_for_completion(&obd_zombie_stop);
1707 #else
1708         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1709         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1710 #endif
1711 }
1712
1713 /***** Kernel-userspace comm helpers *******/
1714
1715 /* Get length of entire message, including header */
1716 int kuc_len(int payload_len)
1717 {
1718         return sizeof(struct kuc_hdr) + payload_len;
1719 }
1720 EXPORT_SYMBOL(kuc_len);
1721
1722 /* Get a pointer to kuc header, given a ptr to the payload
1723  * @param p Pointer to payload area
1724  * @returns Pointer to kuc header
1725  */
1726 struct kuc_hdr * kuc_ptr(void *p)
1727 {
1728         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1729         LASSERT(lh->kuc_magic == KUC_MAGIC);
1730         return lh;
1731 }
1732 EXPORT_SYMBOL(kuc_ptr);
1733
1734 /* Test if payload is part of kuc message
1735  * @param p Pointer to payload area
1736  * @returns boolean
1737  */
1738 int kuc_ispayload(void *p)
1739 {
1740         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1741
1742         if (kh->kuc_magic == KUC_MAGIC)
1743                 return 1;
1744         else
1745                 return 0;
1746 }
1747 EXPORT_SYMBOL(kuc_ispayload);
1748
1749 /* Alloc space for a message, and fill in header
1750  * @return Pointer to payload area
1751  */
1752 void *kuc_alloc(int payload_len, int transport, int type)
1753 {
1754         struct kuc_hdr *lh;
1755         int len = kuc_len(payload_len);
1756
1757         OBD_ALLOC(lh, len);
1758         if (lh == NULL)
1759                 return ERR_PTR(-ENOMEM);
1760
1761         lh->kuc_magic = KUC_MAGIC;
1762         lh->kuc_transport = transport;
1763         lh->kuc_msgtype = type;
1764         lh->kuc_msglen = len;
1765
1766         return (void *)(lh + 1);
1767 }
1768 EXPORT_SYMBOL(kuc_alloc);
1769
1770 /* Takes pointer to payload area */
1771 inline void kuc_free(void *p, int payload_len)
1772 {
1773         struct kuc_hdr *lh = kuc_ptr(p);
1774         OBD_FREE(lh, kuc_len(payload_len));
1775 }
1776 EXPORT_SYMBOL(kuc_free);
1777
1778
1779