Whamcloud - gitweb
LU-365 Update copyright for files modified by Whamcloud
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151
152 #define CLASS_MAX_NAME 1024
153
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155                         struct lprocfs_vars *vars, const char *name,
156                         struct lu_device_type *ldt)
157 {
158         struct obd_type *type;
159         int rc = 0;
160         ENTRY;
161
162         /* sanity check */
163         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164
165         if (class_search_type(name)) {
166                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
167                 RETURN(-EEXIST);
168         }
169
170         rc = -ENOMEM;
171         OBD_ALLOC(type, sizeof(*type));
172         if (type == NULL)
173                 RETURN(rc);
174
175         OBD_ALLOC_PTR(type->typ_dt_ops);
176         OBD_ALLOC_PTR(type->typ_md_ops);
177         OBD_ALLOC(type->typ_name, strlen(name) + 1);
178
179         if (type->typ_dt_ops == NULL ||
180             type->typ_md_ops == NULL ||
181             type->typ_name == NULL)
182                 GOTO (failed, rc);
183
184         *(type->typ_dt_ops) = *dt_ops;
185         /* md_ops is optional */
186         if (md_ops)
187                 *(type->typ_md_ops) = *md_ops;
188         strcpy(type->typ_name, name);
189         cfs_spin_lock_init(&type->obd_type_lock);
190
191 #ifdef LPROCFS
192         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193                                               vars, type);
194         if (IS_ERR(type->typ_procroot)) {
195                 rc = PTR_ERR(type->typ_procroot);
196                 type->typ_procroot = NULL;
197                 GOTO (failed, rc);
198         }
199 #endif
200         if (ldt != NULL) {
201                 type->typ_lu = ldt;
202                 rc = lu_device_type_init(ldt);
203                 if (rc != 0)
204                         GOTO (failed, rc);
205         }
206
207         cfs_spin_lock(&obd_types_lock);
208         cfs_list_add(&type->typ_chain, &obd_types);
209         cfs_spin_unlock(&obd_types_lock);
210
211         RETURN (0);
212
213  failed:
214         if (type->typ_name != NULL)
215                 OBD_FREE(type->typ_name, strlen(name) + 1);
216         if (type->typ_md_ops != NULL)
217                 OBD_FREE_PTR(type->typ_md_ops);
218         if (type->typ_dt_ops != NULL)
219                 OBD_FREE_PTR(type->typ_dt_ops);
220         OBD_FREE(type, sizeof(*type));
221         RETURN(rc);
222 }
223
224 int class_unregister_type(const char *name)
225 {
226         struct obd_type *type = class_search_type(name);
227         ENTRY;
228
229         if (!type) {
230                 CERROR("unknown obd type\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (type->typ_refcnt) {
235                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236                 /* This is a bad situation, let's make the best of it */
237                 /* Remove ops, but leave the name for debugging */
238                 OBD_FREE_PTR(type->typ_dt_ops);
239                 OBD_FREE_PTR(type->typ_md_ops);
240                 RETURN(-EBUSY);
241         }
242
243         if (type->typ_procroot) {
244                 lprocfs_remove(&type->typ_procroot);
245         }
246
247         if (type->typ_lu)
248                 lu_device_type_fini(type->typ_lu);
249
250         cfs_spin_lock(&obd_types_lock);
251         cfs_list_del(&type->typ_chain);
252         cfs_spin_unlock(&obd_types_lock);
253         OBD_FREE(type->typ_name, strlen(name) + 1);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         if (type->typ_md_ops != NULL)
257                 OBD_FREE_PTR(type->typ_md_ops);
258         OBD_FREE(type, sizeof(*type));
259         RETURN(0);
260 } /* class_unregister_type */
261
262 /**
263  * Create a new obd device.
264  *
265  * Find an empty slot in ::obd_devs[], create a new obd device in it.
266  *
267  * \param[in] type_name obd device type string.
268  * \param[in] name      obd device name.
269  *
270  * \retval NULL if create fails, otherwise return the obd device
271  *         pointer created.
272  */
273 struct obd_device *class_newdev(const char *type_name, const char *name)
274 {
275         struct obd_device *result = NULL;
276         struct obd_device *newdev;
277         struct obd_type *type = NULL;
278         int i;
279         int new_obd_minor = 0;
280
281         if (strlen(name) >= MAX_OBD_NAME) {
282                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
283                 RETURN(ERR_PTR(-EINVAL));
284         }
285
286         type = class_get_type(type_name);
287         if (type == NULL){
288                 CERROR("OBD: unknown type: %s\n", type_name);
289                 RETURN(ERR_PTR(-ENODEV));
290         }
291
292         newdev = obd_device_alloc();
293         if (newdev == NULL) {
294                 class_put_type(type);
295                 RETURN(ERR_PTR(-ENOMEM));
296         }
297         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
298
299         cfs_spin_lock(&obd_dev_lock);
300         for (i = 0; i < class_devno_max(); i++) {
301                 struct obd_device *obd = class_num2obd(i);
302                 if (obd && obd->obd_name &&
303                     (strcmp(name, obd->obd_name) == 0)) {
304                         CERROR("Device %s already exists, won't add\n", name);
305                         if (result) {
306                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
307                                          "%p obd_magic %08x != %08x\n", result,
308                                          result->obd_magic, OBD_DEVICE_MAGIC);
309                                 LASSERTF(result->obd_minor == new_obd_minor,
310                                          "%p obd_minor %d != %d\n", result,
311                                          result->obd_minor, new_obd_minor);
312
313                                 obd_devs[result->obd_minor] = NULL;
314                                 result->obd_name[0]='\0';
315                          }
316                         result = ERR_PTR(-EEXIST);
317                         break;
318                 }
319                 if (!result && !obd) {
320                         result = newdev;
321                         result->obd_minor = i;
322                         new_obd_minor = i;
323                         result->obd_type = type;
324                         strncpy(result->obd_name, name,
325                                 sizeof(result->obd_name) - 1);
326                         obd_devs[i] = result;
327                 }
328         }
329         cfs_spin_unlock(&obd_dev_lock);
330
331         if (result == NULL && i >= class_devno_max()) {
332                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
333                        class_devno_max());
334                 result = ERR_PTR(-EOVERFLOW);
335         }
336
337         if (IS_ERR(result)) {
338                 obd_device_free(newdev);
339                 class_put_type(type);
340         } else {
341                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
342                        result->obd_name, result);
343         }
344         return result;
345 }
346
347 void class_release_dev(struct obd_device *obd)
348 {
349         struct obd_type *obd_type = obd->obd_type;
350
351         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
352                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
353         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
354                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355         LASSERT(obd_type != NULL);
356
357         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
358                obd->obd_name,obd->obd_type->typ_name);
359
360         cfs_spin_lock(&obd_dev_lock);
361         obd_devs[obd->obd_minor] = NULL;
362         cfs_spin_unlock(&obd_dev_lock);
363         obd_device_free(obd);
364
365         class_put_type(obd_type);
366 }
367
368 int class_name2dev(const char *name)
369 {
370         int i;
371
372         if (!name)
373                 return -1;
374
375         cfs_spin_lock(&obd_dev_lock);
376         for (i = 0; i < class_devno_max(); i++) {
377                 struct obd_device *obd = class_num2obd(i);
378                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
379                         /* Make sure we finished attaching before we give
380                            out any references */
381                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382                         if (obd->obd_attached) {
383                                 cfs_spin_unlock(&obd_dev_lock);
384                                 return i;
385                         }
386                         break;
387                 }
388         }
389         cfs_spin_unlock(&obd_dev_lock);
390
391         return -1;
392 }
393
394 struct obd_device *class_name2obd(const char *name)
395 {
396         int dev = class_name2dev(name);
397
398         if (dev < 0 || dev > class_devno_max())
399                 return NULL;
400         return class_num2obd(dev);
401 }
402
403 int class_uuid2dev(struct obd_uuid *uuid)
404 {
405         int i;
406
407         cfs_spin_lock(&obd_dev_lock);
408         for (i = 0; i < class_devno_max(); i++) {
409                 struct obd_device *obd = class_num2obd(i);
410                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         cfs_spin_unlock(&obd_dev_lock);
413                         return i;
414                 }
415         }
416         cfs_spin_unlock(&obd_dev_lock);
417
418         return -1;
419 }
420
421 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
422 {
423         int dev = class_uuid2dev(uuid);
424         if (dev < 0)
425                 return NULL;
426         return class_num2obd(dev);
427 }
428
429 /**
430  * Get obd device from ::obd_devs[]
431  *
432  * \param num [in] array index
433  *
434  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
435  *         otherwise return the obd device there.
436  */
437 struct obd_device *class_num2obd(int num)
438 {
439         struct obd_device *obd = NULL;
440
441         if (num < class_devno_max()) {
442                 obd = obd_devs[num];
443                 if (obd == NULL)
444                         return NULL;
445
446                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
447                          "%p obd_magic %08x != %08x\n",
448                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
449                 LASSERTF(obd->obd_minor == num,
450                          "%p obd_minor %0d != %0d\n",
451                          obd, obd->obd_minor, num);
452         }
453
454         return obd;
455 }
456
457 void class_obd_list(void)
458 {
459         char *status;
460         int i;
461
462         cfs_spin_lock(&obd_dev_lock);
463         for (i = 0; i < class_devno_max(); i++) {
464                 struct obd_device *obd = class_num2obd(i);
465                 if (obd == NULL)
466                         continue;
467                 if (obd->obd_stopping)
468                         status = "ST";
469                 else if (obd->obd_set_up)
470                         status = "UP";
471                 else if (obd->obd_attached)
472                         status = "AT";
473                 else
474                         status = "--";
475                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
476                          i, status, obd->obd_type->typ_name,
477                          obd->obd_name, obd->obd_uuid.uuid,
478                          cfs_atomic_read(&obd->obd_refcount));
479         }
480         cfs_spin_unlock(&obd_dev_lock);
481         return;
482 }
483
484 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
485    specified, then only the client with that uuid is returned,
486    otherwise any client connected to the tgt is returned. */
487 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
488                                           const char * typ_name,
489                                           struct obd_uuid *grp_uuid)
490 {
491         int i;
492
493         cfs_spin_lock(&obd_dev_lock);
494         for (i = 0; i < class_devno_max(); i++) {
495                 struct obd_device *obd = class_num2obd(i);
496                 if (obd == NULL)
497                         continue;
498                 if ((strncmp(obd->obd_type->typ_name, typ_name,
499                              strlen(typ_name)) == 0)) {
500                         if (obd_uuid_equals(tgt_uuid,
501                                             &obd->u.cli.cl_target_uuid) &&
502                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
503                                                          &obd->obd_uuid) : 1)) {
504                                 cfs_spin_unlock(&obd_dev_lock);
505                                 return obd;
506                         }
507                 }
508         }
509         cfs_spin_unlock(&obd_dev_lock);
510
511         return NULL;
512 }
513
514 /* Iterate the obd_device list looking devices have grp_uuid. Start
515    searching at *next, and if a device is found, the next index to look
516    at is saved in *next. If next is NULL, then the first matching device
517    will always be returned. */
518 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
519 {
520         int i;
521
522         if (next == NULL)
523                 i = 0;
524         else if (*next >= 0 && *next < class_devno_max())
525                 i = *next;
526         else
527                 return NULL;
528
529         cfs_spin_lock(&obd_dev_lock);
530         for (; i < class_devno_max(); i++) {
531                 struct obd_device *obd = class_num2obd(i);
532                 if (obd == NULL)
533                         continue;
534                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535                         if (next != NULL)
536                                 *next = i+1;
537                         cfs_spin_unlock(&obd_dev_lock);
538                         return obd;
539                 }
540         }
541         cfs_spin_unlock(&obd_dev_lock);
542
543         return NULL;
544 }
545
546 /**
547  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
548  * adjust sptlrpc settings accordingly.
549  */
550 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
551 {
552         struct obd_device  *obd;
553         const char         *type;
554         int                 i, rc = 0, rc2;
555
556         LASSERT(namelen > 0);
557
558         cfs_spin_lock(&obd_dev_lock);
559         for (i = 0; i < class_devno_max(); i++) {
560                 obd = class_num2obd(i);
561
562                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563                         continue;
564
565                 /* only notify mdc, osc, mdt, ost */
566                 type = obd->obd_type->typ_name;
567                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
569                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
570                     strcmp(type, LUSTRE_OST_NAME) != 0)
571                         continue;
572
573                 if (strncmp(obd->obd_name, fsname, namelen))
574                         continue;
575
576                 class_incref(obd, __FUNCTION__, obd);
577                 cfs_spin_unlock(&obd_dev_lock);
578                 rc2 = obd_set_info_async(obd->obd_self_export,
579                                          sizeof(KEY_SPTLRPC_CONF),
580                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
581                 rc = rc ? rc : rc2;
582                 class_decref(obd, __FUNCTION__, obd);
583                 cfs_spin_lock(&obd_dev_lock);
584         }
585         cfs_spin_unlock(&obd_dev_lock);
586         return rc;
587 }
588 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
589
590 void obd_cleanup_caches(void)
591 {
592         int rc;
593
594         ENTRY;
595         if (obd_device_cachep) {
596                 rc = cfs_mem_cache_destroy(obd_device_cachep);
597                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
598                 obd_device_cachep = NULL;
599         }
600         if (obdo_cachep) {
601                 rc = cfs_mem_cache_destroy(obdo_cachep);
602                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603                 obdo_cachep = NULL;
604         }
605         if (import_cachep) {
606                 rc = cfs_mem_cache_destroy(import_cachep);
607                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
608                 import_cachep = NULL;
609         }
610         if (capa_cachep) {
611                 rc = cfs_mem_cache_destroy(capa_cachep);
612                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
613                 capa_cachep = NULL;
614         }
615         EXIT;
616 }
617
618 int obd_init_caches(void)
619 {
620         ENTRY;
621
622         LASSERT(obd_device_cachep == NULL);
623         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
624                                                  sizeof(struct obd_device),
625                                                  0, 0);
626         if (!obd_device_cachep)
627                 GOTO(out, -ENOMEM);
628
629         LASSERT(obdo_cachep == NULL);
630         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
631                                            0, 0);
632         if (!obdo_cachep)
633                 GOTO(out, -ENOMEM);
634
635         LASSERT(import_cachep == NULL);
636         import_cachep = cfs_mem_cache_create("ll_import_cache",
637                                              sizeof(struct obd_import),
638                                              0, 0);
639         if (!import_cachep)
640                 GOTO(out, -ENOMEM);
641
642         LASSERT(capa_cachep == NULL);
643         capa_cachep = cfs_mem_cache_create("capa_cache",
644                                            sizeof(struct obd_capa), 0, 0);
645         if (!capa_cachep)
646                 GOTO(out, -ENOMEM);
647
648         RETURN(0);
649  out:
650         obd_cleanup_caches();
651         RETURN(-ENOMEM);
652
653 }
654
655 /* map connection to client */
656 struct obd_export *class_conn2export(struct lustre_handle *conn)
657 {
658         struct obd_export *export;
659         ENTRY;
660
661         if (!conn) {
662                 CDEBUG(D_CACHE, "looking for null handle\n");
663                 RETURN(NULL);
664         }
665
666         if (conn->cookie == -1) {  /* this means assign a new connection */
667                 CDEBUG(D_CACHE, "want a new connection\n");
668                 RETURN(NULL);
669         }
670
671         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
672         export = class_handle2object(conn->cookie);
673         RETURN(export);
674 }
675
676 struct obd_device *class_exp2obd(struct obd_export *exp)
677 {
678         if (exp)
679                 return exp->exp_obd;
680         return NULL;
681 }
682
683 struct obd_device *class_conn2obd(struct lustre_handle *conn)
684 {
685         struct obd_export *export;
686         export = class_conn2export(conn);
687         if (export) {
688                 struct obd_device *obd = export->exp_obd;
689                 class_export_put(export);
690                 return obd;
691         }
692         return NULL;
693 }
694
695 struct obd_import *class_exp2cliimp(struct obd_export *exp)
696 {
697         struct obd_device *obd = exp->exp_obd;
698         if (obd == NULL)
699                 return NULL;
700         return obd->u.cli.cl_import;
701 }
702
703 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
704 {
705         struct obd_device *obd = class_conn2obd(conn);
706         if (obd == NULL)
707                 return NULL;
708         return obd->u.cli.cl_import;
709 }
710
711 /* Export management functions */
712 static void class_export_destroy(struct obd_export *exp)
713 {
714         struct obd_device *obd = exp->exp_obd;
715         ENTRY;
716
717         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
718
719         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
720                exp->exp_client_uuid.uuid, obd->obd_name);
721
722         LASSERT(obd != NULL);
723
724         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
725         if (exp->exp_connection)
726                 ptlrpc_put_connection_superhack(exp->exp_connection);
727
728         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
729         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
730         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
731         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
732         obd_destroy_export(exp);
733         class_decref(obd, "export", exp);
734
735         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736         EXIT;
737 }
738
739 static void export_handle_addref(void *export)
740 {
741         class_export_get(export);
742 }
743
744 struct obd_export *class_export_get(struct obd_export *exp)
745 {
746         cfs_atomic_inc(&exp->exp_refcount);
747         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
748                cfs_atomic_read(&exp->exp_refcount));
749         return exp;
750 }
751 EXPORT_SYMBOL(class_export_get);
752
753 void class_export_put(struct obd_export *exp)
754 {
755         LASSERT(exp != NULL);
756         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
757         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
758                cfs_atomic_read(&exp->exp_refcount) - 1);
759
760         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
761                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
762                 CDEBUG(D_IOCTL, "final put %p/%s\n",
763                        exp, exp->exp_client_uuid.uuid);
764
765                 /* release nid stat refererence */
766                 lprocfs_exp_cleanup(exp);
767
768                 obd_zombie_export_add(exp);
769         }
770 }
771 EXPORT_SYMBOL(class_export_put);
772
773 /* Creates a new export, adds it to the hash table, and returns a
774  * pointer to it. The refcount is 2: one for the hash reference, and
775  * one for the pointer returned by this function. */
776 struct obd_export *class_new_export(struct obd_device *obd,
777                                     struct obd_uuid *cluuid)
778 {
779         struct obd_export *export;
780         cfs_hash_t *hash = NULL;
781         int rc = 0;
782         ENTRY;
783
784         OBD_ALLOC_PTR(export);
785         if (!export)
786                 return ERR_PTR(-ENOMEM);
787
788         export->exp_conn_cnt = 0;
789         export->exp_lock_hash = NULL;
790         cfs_atomic_set(&export->exp_refcount, 2);
791         cfs_atomic_set(&export->exp_rpc_count, 0);
792         cfs_atomic_set(&export->exp_cb_count, 0);
793         cfs_atomic_set(&export->exp_locks_count, 0);
794 #if LUSTRE_TRACKS_LOCK_EXP_REFS
795         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
796         cfs_spin_lock_init(&export->exp_locks_list_guard);
797 #endif
798         cfs_atomic_set(&export->exp_replay_count, 0);
799         export->exp_obd = obd;
800         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
801         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
802         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
803         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
804         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
805         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
806         class_handle_hash(&export->exp_handle, export_handle_addref);
807         export->exp_last_request_time = cfs_time_current_sec();
808         cfs_spin_lock_init(&export->exp_lock);
809         cfs_spin_lock_init(&export->exp_rpc_lock);
810         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
811         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
812
813         export->exp_sp_peer = LUSTRE_SP_ANY;
814         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
815         export->exp_client_uuid = *cluuid;
816         obd_init_export(export);
817
818         cfs_spin_lock(&obd->obd_dev_lock);
819          /* shouldn't happen, but might race */
820         if (obd->obd_stopping)
821                 GOTO(exit_unlock, rc = -ENODEV);
822
823         hash = cfs_hash_getref(obd->obd_uuid_hash);
824         if (hash == NULL)
825                 GOTO(exit_unlock, rc = -ENODEV);
826         cfs_spin_unlock(&obd->obd_dev_lock);
827
828         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
829                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
830                 if (rc != 0) {
831                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
832                                       obd->obd_name, cluuid->uuid, rc);
833                         GOTO(exit_err, rc = -EALREADY);
834                 }
835         }
836
837         cfs_spin_lock(&obd->obd_dev_lock);
838         if (obd->obd_stopping) {
839                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
840                 GOTO(exit_unlock, rc = -ENODEV);
841         }
842
843         class_incref(obd, "export", export);
844         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
845         cfs_list_add_tail(&export->exp_obd_chain_timed,
846                           &export->exp_obd->obd_exports_timed);
847         export->exp_obd->obd_num_exports++;
848         cfs_spin_unlock(&obd->obd_dev_lock);
849         cfs_hash_putref(hash);
850         RETURN(export);
851
852 exit_unlock:
853         cfs_spin_unlock(&obd->obd_dev_lock);
854 exit_err:
855         if (hash)
856                 cfs_hash_putref(hash);
857         class_handle_unhash(&export->exp_handle);
858         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
859         obd_destroy_export(export);
860         OBD_FREE_PTR(export);
861         return ERR_PTR(rc);
862 }
863 EXPORT_SYMBOL(class_new_export);
864
865 void class_unlink_export(struct obd_export *exp)
866 {
867         class_handle_unhash(&exp->exp_handle);
868
869         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
870         /* delete an uuid-export hashitem from hashtables */
871         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
872                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
873                              &exp->exp_client_uuid,
874                              &exp->exp_uuid_hash);
875
876         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
877         cfs_list_del_init(&exp->exp_obd_chain_timed);
878         exp->exp_obd->obd_num_exports--;
879         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
880         class_export_put(exp);
881 }
882 EXPORT_SYMBOL(class_unlink_export);
883
884 /* Import management functions */
885 void class_import_destroy(struct obd_import *imp)
886 {
887         ENTRY;
888
889         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
890                 imp->imp_obd->obd_name);
891
892         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
893
894         ptlrpc_put_connection_superhack(imp->imp_connection);
895
896         while (!cfs_list_empty(&imp->imp_conn_list)) {
897                 struct obd_import_conn *imp_conn;
898
899                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
900                                           struct obd_import_conn, oic_item);
901                 cfs_list_del_init(&imp_conn->oic_item);
902                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
903                 OBD_FREE(imp_conn, sizeof(*imp_conn));
904         }
905
906         LASSERT(imp->imp_sec == NULL);
907         class_decref(imp->imp_obd, "import", imp);
908         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
909         EXIT;
910 }
911
912 static void import_handle_addref(void *import)
913 {
914         class_import_get(import);
915 }
916
917 struct obd_import *class_import_get(struct obd_import *import)
918 {
919         cfs_atomic_inc(&import->imp_refcount);
920         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
921                cfs_atomic_read(&import->imp_refcount),
922                import->imp_obd->obd_name);
923         return import;
924 }
925 EXPORT_SYMBOL(class_import_get);
926
927 void class_import_put(struct obd_import *imp)
928 {
929         ENTRY;
930
931         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
932         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
933
934         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
935                cfs_atomic_read(&imp->imp_refcount) - 1,
936                imp->imp_obd->obd_name);
937
938         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
939                 CDEBUG(D_INFO, "final put import %p\n", imp);
940                 obd_zombie_import_add(imp);
941         }
942
943         EXIT;
944 }
945 EXPORT_SYMBOL(class_import_put);
946
947 static void init_imp_at(struct imp_at *at) {
948         int i;
949         at_init(&at->iat_net_latency, 0, 0);
950         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
951                 /* max service estimates are tracked on the server side, so
952                    don't use the AT history here, just use the last reported
953                    val. (But keep hist for proc histogram, worst_ever) */
954                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
955                         AT_FLG_NOHIST);
956         }
957 }
958
959 struct obd_import *class_new_import(struct obd_device *obd)
960 {
961         struct obd_import *imp;
962
963         OBD_ALLOC(imp, sizeof(*imp));
964         if (imp == NULL)
965                 return NULL;
966
967         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
968         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
969         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
970         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
971         cfs_spin_lock_init(&imp->imp_lock);
972         imp->imp_last_success_conn = 0;
973         imp->imp_state = LUSTRE_IMP_NEW;
974         imp->imp_obd = class_incref(obd, "import", imp);
975         cfs_sema_init(&imp->imp_sec_mutex, 1);
976         cfs_waitq_init(&imp->imp_recovery_waitq);
977
978         cfs_atomic_set(&imp->imp_refcount, 2);
979         cfs_atomic_set(&imp->imp_unregistering, 0);
980         cfs_atomic_set(&imp->imp_inflight, 0);
981         cfs_atomic_set(&imp->imp_replay_inflight, 0);
982         cfs_atomic_set(&imp->imp_inval_count, 0);
983         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
984         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
985         class_handle_hash(&imp->imp_handle, import_handle_addref);
986         init_imp_at(&imp->imp_at);
987
988         /* the default magic is V2, will be used in connect RPC, and
989          * then adjusted according to the flags in request/reply. */
990         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
991
992         return imp;
993 }
994 EXPORT_SYMBOL(class_new_import);
995
996 void class_destroy_import(struct obd_import *import)
997 {
998         LASSERT(import != NULL);
999         LASSERT(import != LP_POISON);
1000
1001         class_handle_unhash(&import->imp_handle);
1002
1003         cfs_spin_lock(&import->imp_lock);
1004         import->imp_generation++;
1005         cfs_spin_unlock(&import->imp_lock);
1006         class_import_put(import);
1007 }
1008 EXPORT_SYMBOL(class_destroy_import);
1009
1010 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1011
1012 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1013 {
1014         cfs_spin_lock(&exp->exp_locks_list_guard);
1015
1016         LASSERT(lock->l_exp_refs_nr >= 0);
1017
1018         if (lock->l_exp_refs_target != NULL &&
1019             lock->l_exp_refs_target != exp) {
1020                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1021                               exp, lock, lock->l_exp_refs_target);
1022         }
1023         if ((lock->l_exp_refs_nr ++) == 0) {
1024                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1025                 lock->l_exp_refs_target = exp;
1026         }
1027         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1028                lock, exp, lock->l_exp_refs_nr);
1029         cfs_spin_unlock(&exp->exp_locks_list_guard);
1030 }
1031 EXPORT_SYMBOL(__class_export_add_lock_ref);
1032
1033 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1034 {
1035         cfs_spin_lock(&exp->exp_locks_list_guard);
1036         LASSERT(lock->l_exp_refs_nr > 0);
1037         if (lock->l_exp_refs_target != exp) {
1038                 LCONSOLE_WARN("lock %p, "
1039                               "mismatching export pointers: %p, %p\n",
1040                               lock, lock->l_exp_refs_target, exp);
1041         }
1042         if (-- lock->l_exp_refs_nr == 0) {
1043                 cfs_list_del_init(&lock->l_exp_refs_link);
1044                 lock->l_exp_refs_target = NULL;
1045         }
1046         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1047                lock, exp, lock->l_exp_refs_nr);
1048         cfs_spin_unlock(&exp->exp_locks_list_guard);
1049 }
1050 EXPORT_SYMBOL(__class_export_del_lock_ref);
1051 #endif
1052
1053 /* A connection defines an export context in which preallocation can
1054    be managed. This releases the export pointer reference, and returns
1055    the export handle, so the export refcount is 1 when this function
1056    returns. */
1057 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1058                   struct obd_uuid *cluuid)
1059 {
1060         struct obd_export *export;
1061         LASSERT(conn != NULL);
1062         LASSERT(obd != NULL);
1063         LASSERT(cluuid != NULL);
1064         ENTRY;
1065
1066         export = class_new_export(obd, cluuid);
1067         if (IS_ERR(export))
1068                 RETURN(PTR_ERR(export));
1069
1070         conn->cookie = export->exp_handle.h_cookie;
1071         class_export_put(export);
1072
1073         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1074                cluuid->uuid, conn->cookie);
1075         RETURN(0);
1076 }
1077 EXPORT_SYMBOL(class_connect);
1078
1079 /* if export is involved in recovery then clean up related things */
1080 void class_export_recovery_cleanup(struct obd_export *exp)
1081 {
1082         struct obd_device *obd = exp->exp_obd;
1083
1084         cfs_spin_lock(&obd->obd_recovery_task_lock);
1085         if (exp->exp_delayed)
1086                 obd->obd_delayed_clients--;
1087         if (obd->obd_recovering && exp->exp_in_recovery) {
1088                 cfs_spin_lock(&exp->exp_lock);
1089                 exp->exp_in_recovery = 0;
1090                 cfs_spin_unlock(&exp->exp_lock);
1091                 LASSERT(obd->obd_connected_clients);
1092                 obd->obd_connected_clients--;
1093         }
1094         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1095         /** Cleanup req replay fields */
1096         if (exp->exp_req_replay_needed) {
1097                 cfs_spin_lock(&exp->exp_lock);
1098                 exp->exp_req_replay_needed = 0;
1099                 cfs_spin_unlock(&exp->exp_lock);
1100                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1101                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1102         }
1103         /** Cleanup lock replay data */
1104         if (exp->exp_lock_replay_needed) {
1105                 cfs_spin_lock(&exp->exp_lock);
1106                 exp->exp_lock_replay_needed = 0;
1107                 cfs_spin_unlock(&exp->exp_lock);
1108                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1109                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1110         }
1111 }
1112
1113 /* This function removes 1-3 references from the export:
1114  * 1 - for export pointer passed
1115  * and if disconnect really need
1116  * 2 - removing from hash
1117  * 3 - in client_unlink_export
1118  * The export pointer passed to this function can destroyed */
1119 int class_disconnect(struct obd_export *export)
1120 {
1121         int already_disconnected;
1122         ENTRY;
1123
1124         if (export == NULL) {
1125                 fixme();
1126                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1127                 RETURN(-EINVAL);
1128         }
1129
1130         cfs_spin_lock(&export->exp_lock);
1131         already_disconnected = export->exp_disconnected;
1132         export->exp_disconnected = 1;
1133         cfs_spin_unlock(&export->exp_lock);
1134
1135         /* class_cleanup(), abort_recovery(), and class_fail_export()
1136          * all end up in here, and if any of them race we shouldn't
1137          * call extra class_export_puts(). */
1138         if (already_disconnected) {
1139                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1140                 GOTO(no_disconn, already_disconnected);
1141         }
1142
1143         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1144                export->exp_handle.h_cookie);
1145
1146         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1147                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1148                              &export->exp_connection->c_peer.nid,
1149                              &export->exp_nid_hash);
1150
1151         class_export_recovery_cleanup(export);
1152         class_unlink_export(export);
1153 no_disconn:
1154         class_export_put(export);
1155         RETURN(0);
1156 }
1157
1158 /* Return non-zero for a fully connected export */
1159 int class_connected_export(struct obd_export *exp)
1160 {
1161         if (exp) {
1162                 int connected;
1163                 cfs_spin_lock(&exp->exp_lock);
1164                 connected = (exp->exp_conn_cnt > 0);
1165                 cfs_spin_unlock(&exp->exp_lock);
1166                 return connected;
1167         }
1168         return 0;
1169 }
1170 EXPORT_SYMBOL(class_connected_export);
1171
1172 static void class_disconnect_export_list(cfs_list_t *list,
1173                                          enum obd_option flags)
1174 {
1175         int rc;
1176         struct obd_export *exp;
1177         ENTRY;
1178
1179         /* It's possible that an export may disconnect itself, but
1180          * nothing else will be added to this list. */
1181         while (!cfs_list_empty(list)) {
1182                 exp = cfs_list_entry(list->next, struct obd_export,
1183                                      exp_obd_chain);
1184                 /* need for safe call CDEBUG after obd_disconnect */
1185                 class_export_get(exp);
1186
1187                 cfs_spin_lock(&exp->exp_lock);
1188                 exp->exp_flags = flags;
1189                 cfs_spin_unlock(&exp->exp_lock);
1190
1191                 if (obd_uuid_equals(&exp->exp_client_uuid,
1192                                     &exp->exp_obd->obd_uuid)) {
1193                         CDEBUG(D_HA,
1194                                "exp %p export uuid == obd uuid, don't discon\n",
1195                                exp);
1196                         /* Need to delete this now so we don't end up pointing
1197                          * to work_list later when this export is cleaned up. */
1198                         cfs_list_del_init(&exp->exp_obd_chain);
1199                         class_export_put(exp);
1200                         continue;
1201                 }
1202
1203                 class_export_get(exp);
1204                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1205                        "last request at "CFS_TIME_T"\n",
1206                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1207                        exp, exp->exp_last_request_time);
1208                 /* release one export reference anyway */
1209                 rc = obd_disconnect(exp);
1210
1211                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1212                        obd_export_nid2str(exp), exp, rc);
1213                 class_export_put(exp);
1214         }
1215         EXIT;
1216 }
1217
1218 void class_disconnect_exports(struct obd_device *obd)
1219 {
1220         cfs_list_t work_list;
1221         ENTRY;
1222
1223         /* Move all of the exports from obd_exports to a work list, en masse. */
1224         CFS_INIT_LIST_HEAD(&work_list);
1225         cfs_spin_lock(&obd->obd_dev_lock);
1226         cfs_list_splice_init(&obd->obd_exports, &work_list);
1227         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1228         cfs_spin_unlock(&obd->obd_dev_lock);
1229
1230         if (!cfs_list_empty(&work_list)) {
1231                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1232                        "disconnecting them\n", obd->obd_minor, obd);
1233                 class_disconnect_export_list(&work_list,
1234                                              exp_flags_from_obd(obd));
1235         } else
1236                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1237                        obd->obd_minor, obd);
1238         EXIT;
1239 }
1240 EXPORT_SYMBOL(class_disconnect_exports);
1241
1242 /* Remove exports that have not completed recovery.
1243  */
1244 void class_disconnect_stale_exports(struct obd_device *obd,
1245                                     int (*test_export)(struct obd_export *))
1246 {
1247         cfs_list_t work_list;
1248         cfs_list_t *pos, *n;
1249         struct obd_export *exp;
1250         int evicted = 0;
1251         ENTRY;
1252
1253         CFS_INIT_LIST_HEAD(&work_list);
1254         cfs_spin_lock(&obd->obd_dev_lock);
1255         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1256                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1257                 if (test_export(exp))
1258                         continue;
1259
1260                 /* don't count self-export as client */
1261                 if (obd_uuid_equals(&exp->exp_client_uuid,
1262                                     &exp->exp_obd->obd_uuid))
1263                         continue;
1264
1265                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1266                 evicted++;
1267                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1268                        obd->obd_name, exp->exp_client_uuid.uuid,
1269                        exp->exp_connection == NULL ? "<unknown>" :
1270                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1271                 print_export_data(exp, "EVICTING", 0);
1272         }
1273         cfs_spin_unlock(&obd->obd_dev_lock);
1274
1275         if (evicted) {
1276                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1277                        obd->obd_name, evicted);
1278                 obd->obd_stale_clients += evicted;
1279         }
1280         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1281                                                  OBD_OPT_ABORT_RECOV);
1282         EXIT;
1283 }
1284 EXPORT_SYMBOL(class_disconnect_stale_exports);
1285
1286 void class_fail_export(struct obd_export *exp)
1287 {
1288         int rc, already_failed;
1289
1290         cfs_spin_lock(&exp->exp_lock);
1291         already_failed = exp->exp_failed;
1292         exp->exp_failed = 1;
1293         cfs_spin_unlock(&exp->exp_lock);
1294
1295         if (already_failed) {
1296                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1297                        exp, exp->exp_client_uuid.uuid);
1298                 return;
1299         }
1300
1301         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1302                exp, exp->exp_client_uuid.uuid);
1303
1304         if (obd_dump_on_timeout)
1305                 libcfs_debug_dumplog();
1306
1307         /* Most callers into obd_disconnect are removing their own reference
1308          * (request, for example) in addition to the one from the hash table.
1309          * We don't have such a reference here, so make one. */
1310         class_export_get(exp);
1311         rc = obd_disconnect(exp);
1312         if (rc)
1313                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1314         else
1315                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1316                        exp, exp->exp_client_uuid.uuid);
1317 }
1318 EXPORT_SYMBOL(class_fail_export);
1319
1320 char *obd_export_nid2str(struct obd_export *exp)
1321 {
1322         if (exp->exp_connection != NULL)
1323                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1324
1325         return "(no nid)";
1326 }
1327 EXPORT_SYMBOL(obd_export_nid2str);
1328
1329 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1330 {
1331         struct obd_export *doomed_exp = NULL;
1332         int exports_evicted = 0;
1333
1334         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1335
1336         do {
1337                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1338                 if (doomed_exp == NULL)
1339                         break;
1340
1341                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1342                          "nid %s found, wanted nid %s, requested nid %s\n",
1343                          obd_export_nid2str(doomed_exp),
1344                          libcfs_nid2str(nid_key), nid);
1345                 LASSERTF(doomed_exp != obd->obd_self_export,
1346                          "self-export is hashed by NID?\n");
1347                 exports_evicted++;
1348                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1349                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1350                        exports_evicted);
1351                 class_fail_export(doomed_exp);
1352                 class_export_put(doomed_exp);
1353         } while (1);
1354
1355         if (!exports_evicted)
1356                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1357                        obd->obd_name, nid);
1358         return exports_evicted;
1359 }
1360 EXPORT_SYMBOL(obd_export_evict_by_nid);
1361
1362 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1363 {
1364         struct obd_export *doomed_exp = NULL;
1365         struct obd_uuid doomed_uuid;
1366         int exports_evicted = 0;
1367
1368         obd_str2uuid(&doomed_uuid, uuid);
1369         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1370                 CERROR("%s: can't evict myself\n", obd->obd_name);
1371                 return exports_evicted;
1372         }
1373
1374         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1375
1376         if (doomed_exp == NULL) {
1377                 CERROR("%s: can't disconnect %s: no exports found\n",
1378                        obd->obd_name, uuid);
1379         } else {
1380                 CWARN("%s: evicting %s at adminstrative request\n",
1381                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1382                 class_fail_export(doomed_exp);
1383                 class_export_put(doomed_exp);
1384                 exports_evicted++;
1385         }
1386
1387         return exports_evicted;
1388 }
1389 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1390
1391 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1392 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1393 EXPORT_SYMBOL(class_export_dump_hook);
1394 #endif
1395
1396 static void print_export_data(struct obd_export *exp, const char *status,
1397                               int locks)
1398 {
1399         struct ptlrpc_reply_state *rs;
1400         struct ptlrpc_reply_state *first_reply = NULL;
1401         int nreplies = 0;
1402
1403         cfs_spin_lock(&exp->exp_lock);
1404         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1405                                 rs_exp_list) {
1406                 if (nreplies == 0)
1407                         first_reply = rs;
1408                 nreplies++;
1409         }
1410         cfs_spin_unlock(&exp->exp_lock);
1411
1412         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1413                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1414                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1415                cfs_atomic_read(&exp->exp_rpc_count),
1416                cfs_atomic_read(&exp->exp_cb_count),
1417                cfs_atomic_read(&exp->exp_locks_count),
1418                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1419                nreplies, first_reply, nreplies > 3 ? "..." : "",
1420                exp->exp_last_committed);
1421 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1422         if (locks && class_export_dump_hook != NULL)
1423                 class_export_dump_hook(exp);
1424 #endif
1425 }
1426
1427 void dump_exports(struct obd_device *obd, int locks)
1428 {
1429         struct obd_export *exp;
1430
1431         cfs_spin_lock(&obd->obd_dev_lock);
1432         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1433                 print_export_data(exp, "ACTIVE", locks);
1434         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1435                 print_export_data(exp, "UNLINKED", locks);
1436         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1437                 print_export_data(exp, "DELAYED", locks);
1438         cfs_spin_unlock(&obd->obd_dev_lock);
1439         cfs_spin_lock(&obd_zombie_impexp_lock);
1440         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1441                 print_export_data(exp, "ZOMBIE", locks);
1442         cfs_spin_unlock(&obd_zombie_impexp_lock);
1443 }
1444 EXPORT_SYMBOL(dump_exports);
1445
1446 void obd_exports_barrier(struct obd_device *obd)
1447 {
1448         int waited = 2;
1449         LASSERT(cfs_list_empty(&obd->obd_exports));
1450         cfs_spin_lock(&obd->obd_dev_lock);
1451         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1452                 cfs_spin_unlock(&obd->obd_dev_lock);
1453                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1454                                                    cfs_time_seconds(waited));
1455                 if (waited > 5 && IS_PO2(waited)) {
1456                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1457                                       "more than %d seconds. "
1458                                       "The obd refcount = %d. Is it stuck?\n",
1459                                       obd->obd_name, waited,
1460                                       cfs_atomic_read(&obd->obd_refcount));
1461                         dump_exports(obd, 1);
1462                 }
1463                 waited *= 2;
1464                 cfs_spin_lock(&obd->obd_dev_lock);
1465         }
1466         cfs_spin_unlock(&obd->obd_dev_lock);
1467 }
1468 EXPORT_SYMBOL(obd_exports_barrier);
1469
1470 /* Total amount of zombies to be destroyed */
1471 static int zombies_count = 0;
1472
1473 /**
1474  * kill zombie imports and exports
1475  */
1476 void obd_zombie_impexp_cull(void)
1477 {
1478         struct obd_import *import;
1479         struct obd_export *export;
1480         ENTRY;
1481
1482         do {
1483                 cfs_spin_lock(&obd_zombie_impexp_lock);
1484
1485                 import = NULL;
1486                 if (!cfs_list_empty(&obd_zombie_imports)) {
1487                         import = cfs_list_entry(obd_zombie_imports.next,
1488                                                 struct obd_import,
1489                                                 imp_zombie_chain);
1490                         cfs_list_del_init(&import->imp_zombie_chain);
1491                 }
1492
1493                 export = NULL;
1494                 if (!cfs_list_empty(&obd_zombie_exports)) {
1495                         export = cfs_list_entry(obd_zombie_exports.next,
1496                                                 struct obd_export,
1497                                                 exp_obd_chain);
1498                         cfs_list_del_init(&export->exp_obd_chain);
1499                 }
1500
1501                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1502
1503                 if (import != NULL) {
1504                         class_import_destroy(import);
1505                         cfs_spin_lock(&obd_zombie_impexp_lock);
1506                         zombies_count--;
1507                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1508                 }
1509
1510                 if (export != NULL) {
1511                         class_export_destroy(export);
1512                         cfs_spin_lock(&obd_zombie_impexp_lock);
1513                         zombies_count--;
1514                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1515                 }
1516
1517                 cfs_cond_resched();
1518         } while (import != NULL || export != NULL);
1519         EXIT;
1520 }
1521
1522 static cfs_completion_t         obd_zombie_start;
1523 static cfs_completion_t         obd_zombie_stop;
1524 static unsigned long            obd_zombie_flags;
1525 static cfs_waitq_t              obd_zombie_waitq;
1526 static pid_t                    obd_zombie_pid;
1527
1528 enum {
1529         OBD_ZOMBIE_STOP   = 1 << 1
1530 };
1531
1532 /**
1533  * check for work for kill zombie import/export thread.
1534  */
1535 static int obd_zombie_impexp_check(void *arg)
1536 {
1537         int rc;
1538
1539         cfs_spin_lock(&obd_zombie_impexp_lock);
1540         rc = (zombies_count == 0) &&
1541              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1542         cfs_spin_unlock(&obd_zombie_impexp_lock);
1543
1544         RETURN(rc);
1545 }
1546
1547 /**
1548  * Add export to the obd_zombe thread and notify it.
1549  */
1550 static void obd_zombie_export_add(struct obd_export *exp) {
1551         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1552         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1553         cfs_list_del_init(&exp->exp_obd_chain);
1554         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1555         cfs_spin_lock(&obd_zombie_impexp_lock);
1556         zombies_count++;
1557         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1558         cfs_spin_unlock(&obd_zombie_impexp_lock);
1559
1560         if (obd_zombie_impexp_notify != NULL)
1561                 obd_zombie_impexp_notify();
1562 }
1563
1564 /**
1565  * Add import to the obd_zombe thread and notify it.
1566  */
1567 static void obd_zombie_import_add(struct obd_import *imp) {
1568         LASSERT(imp->imp_sec == NULL);
1569         cfs_spin_lock(&obd_zombie_impexp_lock);
1570         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1571         zombies_count++;
1572         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1573         cfs_spin_unlock(&obd_zombie_impexp_lock);
1574
1575         if (obd_zombie_impexp_notify != NULL)
1576                 obd_zombie_impexp_notify();
1577 }
1578
1579 /**
1580  * notify import/export destroy thread about new zombie.
1581  */
1582 static void obd_zombie_impexp_notify(void)
1583 {
1584         /*
1585          * Make sure obd_zomebie_impexp_thread get this notification.
1586          * It is possible this signal only get by obd_zombie_barrier, and
1587          * barrier gulps this notification and sleeps away and hangs ensues
1588          */
1589         cfs_waitq_broadcast(&obd_zombie_waitq);
1590 }
1591
1592 /**
1593  * check whether obd_zombie is idle
1594  */
1595 static int obd_zombie_is_idle(void)
1596 {
1597         int rc;
1598
1599         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1600         cfs_spin_lock(&obd_zombie_impexp_lock);
1601         rc = (zombies_count == 0);
1602         cfs_spin_unlock(&obd_zombie_impexp_lock);
1603         return rc;
1604 }
1605
1606 /**
1607  * wait when obd_zombie import/export queues become empty
1608  */
1609 void obd_zombie_barrier(void)
1610 {
1611         struct l_wait_info lwi = { 0 };
1612
1613         if (obd_zombie_pid == cfs_curproc_pid())
1614                 /* don't wait for myself */
1615                 return;
1616         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1617 }
1618 EXPORT_SYMBOL(obd_zombie_barrier);
1619
1620 #ifdef __KERNEL__
1621
1622 /**
1623  * destroy zombie export/import thread.
1624  */
1625 static int obd_zombie_impexp_thread(void *unused)
1626 {
1627         int rc;
1628
1629         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1630                 cfs_complete(&obd_zombie_start);
1631                 RETURN(rc);
1632         }
1633
1634         cfs_complete(&obd_zombie_start);
1635
1636         obd_zombie_pid = cfs_curproc_pid();
1637
1638         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1639                 struct l_wait_info lwi = { 0 };
1640
1641                 l_wait_event(obd_zombie_waitq,
1642                              !obd_zombie_impexp_check(NULL), &lwi);
1643                 obd_zombie_impexp_cull();
1644
1645                 /*
1646                  * Notify obd_zombie_barrier callers that queues
1647                  * may be empty.
1648                  */
1649                 cfs_waitq_signal(&obd_zombie_waitq);
1650         }
1651
1652         cfs_complete(&obd_zombie_stop);
1653
1654         RETURN(0);
1655 }
1656
1657 #else /* ! KERNEL */
1658
1659 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1660 static void *obd_zombie_impexp_work_cb;
1661 static void *obd_zombie_impexp_idle_cb;
1662
1663 int obd_zombie_impexp_kill(void *arg)
1664 {
1665         int rc = 0;
1666
1667         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1668                 obd_zombie_impexp_cull();
1669                 rc = 1;
1670         }
1671         cfs_atomic_dec(&zombie_recur);
1672         return rc;
1673 }
1674
1675 #endif
1676
1677 /**
1678  * start destroy zombie import/export thread
1679  */
1680 int obd_zombie_impexp_init(void)
1681 {
1682         int rc;
1683
1684         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1685         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1686         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1687         cfs_init_completion(&obd_zombie_start);
1688         cfs_init_completion(&obd_zombie_stop);
1689         cfs_waitq_init(&obd_zombie_waitq);
1690         obd_zombie_pid = 0;
1691
1692 #ifdef __KERNEL__
1693         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1694         if (rc < 0)
1695                 RETURN(rc);
1696
1697         cfs_wait_for_completion(&obd_zombie_start);
1698 #else
1699
1700         obd_zombie_impexp_work_cb =
1701                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1702                                                  &obd_zombie_impexp_kill, NULL);
1703
1704         obd_zombie_impexp_idle_cb =
1705                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1706                                                  &obd_zombie_impexp_check, NULL);
1707         rc = 0;
1708 #endif
1709         RETURN(rc);
1710 }
1711 /**
1712  * stop destroy zombie import/export thread
1713  */
1714 void obd_zombie_impexp_stop(void)
1715 {
1716         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1717         obd_zombie_impexp_notify();
1718 #ifdef __KERNEL__
1719         cfs_wait_for_completion(&obd_zombie_stop);
1720 #else
1721         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1722         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1723 #endif
1724 }
1725
1726 /***** Kernel-userspace comm helpers *******/
1727
1728 /* Get length of entire message, including header */
1729 int kuc_len(int payload_len)
1730 {
1731         return sizeof(struct kuc_hdr) + payload_len;
1732 }
1733 EXPORT_SYMBOL(kuc_len);
1734
1735 /* Get a pointer to kuc header, given a ptr to the payload
1736  * @param p Pointer to payload area
1737  * @returns Pointer to kuc header
1738  */
1739 struct kuc_hdr * kuc_ptr(void *p)
1740 {
1741         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1742         LASSERT(lh->kuc_magic == KUC_MAGIC);
1743         return lh;
1744 }
1745 EXPORT_SYMBOL(kuc_ptr);
1746
1747 /* Test if payload is part of kuc message
1748  * @param p Pointer to payload area
1749  * @returns boolean
1750  */
1751 int kuc_ispayload(void *p)
1752 {
1753         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1754
1755         if (kh->kuc_magic == KUC_MAGIC)
1756                 return 1;
1757         else
1758                 return 0;
1759 }
1760 EXPORT_SYMBOL(kuc_ispayload);
1761
1762 /* Alloc space for a message, and fill in header
1763  * @return Pointer to payload area
1764  */
1765 void *kuc_alloc(int payload_len, int transport, int type)
1766 {
1767         struct kuc_hdr *lh;
1768         int len = kuc_len(payload_len);
1769
1770         OBD_ALLOC(lh, len);
1771         if (lh == NULL)
1772                 return ERR_PTR(-ENOMEM);
1773
1774         lh->kuc_magic = KUC_MAGIC;
1775         lh->kuc_transport = transport;
1776         lh->kuc_msgtype = type;
1777         lh->kuc_msglen = len;
1778
1779         return (void *)(lh + 1);
1780 }
1781 EXPORT_SYMBOL(kuc_alloc);
1782
1783 /* Takes pointer to payload area */
1784 inline void kuc_free(void *p, int payload_len)
1785 {
1786         struct kuc_hdr *lh = kuc_ptr(p);
1787         OBD_FREE(lh, kuc_len(payload_len));
1788 }
1789 EXPORT_SYMBOL(kuc_free);
1790
1791
1792