Whamcloud - gitweb
e0a9416e86648cbb54fed846160d3a01a237c9c8
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         cfs_spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         cfs_spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         cfs_spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122                 if (!cfs_request_module("%s", modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 cfs_spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 cfs_try_module_get(type->typ_dt_ops->o_owner);
135                 cfs_spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         cfs_spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         cfs_module_put(type->typ_dt_ops->o_owner);
146         cfs_spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         cfs_spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         cfs_spin_lock(&obd_types_lock);
205         cfs_list_add(&type->typ_chain, &obd_types);
206         cfs_spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         cfs_spin_lock(&obd_types_lock);
248         cfs_list_del(&type->typ_chain);
249         cfs_spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param[in] type_name obd device type string.
265  * \param[in] name      obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         cfs_spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         cfs_spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         cfs_spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         cfs_spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         cfs_spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 cfs_spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         cfs_spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         cfs_spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         cfs_spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         cfs_spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         cfs_spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          cfs_atomic_read(&obd->obd_refcount));
476         }
477         cfs_spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         cfs_spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 cfs_spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         cfs_spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         cfs_spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         cfs_spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         cfs_spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         cfs_spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 cfs_spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 cfs_spin_lock(&obd_dev_lock);
581         }
582         cfs_spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
726         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
728         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         cfs_atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                cfs_atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                cfs_atomic_read(&exp->exp_refcount) - 1);
756
757         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
758                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
759                 CDEBUG(D_IOCTL, "final put %p/%s\n",
760                        exp, exp->exp_client_uuid.uuid);
761
762                 /* release nid stat refererence */
763                 lprocfs_exp_cleanup(exp);
764
765                 obd_zombie_export_add(exp);
766         }
767 }
768 EXPORT_SYMBOL(class_export_put);
769
770 /* Creates a new export, adds it to the hash table, and returns a
771  * pointer to it. The refcount is 2: one for the hash reference, and
772  * one for the pointer returned by this function. */
773 struct obd_export *class_new_export(struct obd_device *obd,
774                                     struct obd_uuid *cluuid)
775 {
776         struct obd_export *export;
777         cfs_hash_t *hash = NULL;
778         int rc = 0;
779         ENTRY;
780
781         OBD_ALLOC_PTR(export);
782         if (!export)
783                 return ERR_PTR(-ENOMEM);
784
785         export->exp_conn_cnt = 0;
786         export->exp_lock_hash = NULL;
787         cfs_atomic_set(&export->exp_refcount, 2);
788         cfs_atomic_set(&export->exp_rpc_count, 0);
789         cfs_atomic_set(&export->exp_cb_count, 0);
790         cfs_atomic_set(&export->exp_locks_count, 0);
791 #if LUSTRE_TRACKS_LOCK_EXP_REFS
792         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
793         cfs_spin_lock_init(&export->exp_locks_list_guard);
794 #endif
795         cfs_atomic_set(&export->exp_replay_count, 0);
796         export->exp_obd = obd;
797         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
798         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
799         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
800         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
801         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
802         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
803         class_handle_hash(&export->exp_handle, export_handle_addref);
804         export->exp_last_request_time = cfs_time_current_sec();
805         cfs_spin_lock_init(&export->exp_lock);
806         cfs_spin_lock_init(&export->exp_rpc_lock);
807         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
808         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
809
810         export->exp_sp_peer = LUSTRE_SP_ANY;
811         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
812         export->exp_client_uuid = *cluuid;
813         obd_init_export(export);
814
815         cfs_spin_lock(&obd->obd_dev_lock);
816          /* shouldn't happen, but might race */
817         if (obd->obd_stopping)
818                 GOTO(exit_unlock, rc = -ENODEV);
819
820         hash = cfs_hash_getref(obd->obd_uuid_hash);
821         if (hash == NULL)
822                 GOTO(exit_unlock, rc = -ENODEV);
823         cfs_spin_unlock(&obd->obd_dev_lock);
824
825         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
826                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
827                 if (rc != 0) {
828                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
829                                       obd->obd_name, cluuid->uuid, rc);
830                         GOTO(exit_err, rc = -EALREADY);
831                 }
832         }
833
834         cfs_spin_lock(&obd->obd_dev_lock);
835         if (obd->obd_stopping) {
836                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
837                 GOTO(exit_unlock, rc = -ENODEV);
838         }
839
840         class_incref(obd, "export", export);
841         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
842         cfs_list_add_tail(&export->exp_obd_chain_timed,
843                           &export->exp_obd->obd_exports_timed);
844         export->exp_obd->obd_num_exports++;
845         cfs_spin_unlock(&obd->obd_dev_lock);
846         cfs_hash_putref(hash);
847         RETURN(export);
848
849 exit_unlock:
850         cfs_spin_unlock(&obd->obd_dev_lock);
851 exit_err:
852         if (hash)
853                 cfs_hash_putref(hash);
854         class_handle_unhash(&export->exp_handle);
855         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
856         obd_destroy_export(export);
857         OBD_FREE_PTR(export);
858         return ERR_PTR(rc);
859 }
860 EXPORT_SYMBOL(class_new_export);
861
862 void class_unlink_export(struct obd_export *exp)
863 {
864         class_handle_unhash(&exp->exp_handle);
865
866         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
867         /* delete an uuid-export hashitem from hashtables */
868         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
869                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
870                              &exp->exp_client_uuid,
871                              &exp->exp_uuid_hash);
872
873         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
874         cfs_list_del_init(&exp->exp_obd_chain_timed);
875         exp->exp_obd->obd_num_exports--;
876         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
877         class_export_put(exp);
878 }
879 EXPORT_SYMBOL(class_unlink_export);
880
881 /* Import management functions */
882 void class_import_destroy(struct obd_import *imp)
883 {
884         ENTRY;
885
886         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
887                 imp->imp_obd->obd_name);
888
889         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
890
891         ptlrpc_put_connection_superhack(imp->imp_connection);
892
893         while (!cfs_list_empty(&imp->imp_conn_list)) {
894                 struct obd_import_conn *imp_conn;
895
896                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
897                                           struct obd_import_conn, oic_item);
898                 cfs_list_del_init(&imp_conn->oic_item);
899                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
900                 OBD_FREE(imp_conn, sizeof(*imp_conn));
901         }
902
903         LASSERT(imp->imp_sec == NULL);
904         class_decref(imp->imp_obd, "import", imp);
905         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
906         EXIT;
907 }
908
909 static void import_handle_addref(void *import)
910 {
911         class_import_get(import);
912 }
913
914 struct obd_import *class_import_get(struct obd_import *import)
915 {
916         cfs_atomic_inc(&import->imp_refcount);
917         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
918                cfs_atomic_read(&import->imp_refcount),
919                import->imp_obd->obd_name);
920         return import;
921 }
922 EXPORT_SYMBOL(class_import_get);
923
924 void class_import_put(struct obd_import *imp)
925 {
926         ENTRY;
927
928         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
929         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
930
931         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
932                cfs_atomic_read(&imp->imp_refcount) - 1,
933                imp->imp_obd->obd_name);
934
935         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
936                 CDEBUG(D_INFO, "final put import %p\n", imp);
937                 obd_zombie_import_add(imp);
938         }
939
940         EXIT;
941 }
942 EXPORT_SYMBOL(class_import_put);
943
944 static void init_imp_at(struct imp_at *at) {
945         int i;
946         at_init(&at->iat_net_latency, 0, 0);
947         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
948                 /* max service estimates are tracked on the server side, so
949                    don't use the AT history here, just use the last reported
950                    val. (But keep hist for proc histogram, worst_ever) */
951                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
952                         AT_FLG_NOHIST);
953         }
954 }
955
956 struct obd_import *class_new_import(struct obd_device *obd)
957 {
958         struct obd_import *imp;
959
960         OBD_ALLOC(imp, sizeof(*imp));
961         if (imp == NULL)
962                 return NULL;
963
964         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
965         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
966         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
967         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
968         cfs_spin_lock_init(&imp->imp_lock);
969         imp->imp_last_success_conn = 0;
970         imp->imp_state = LUSTRE_IMP_NEW;
971         imp->imp_obd = class_incref(obd, "import", imp);
972         cfs_sema_init(&imp->imp_sec_mutex, 1);
973         cfs_waitq_init(&imp->imp_recovery_waitq);
974
975         cfs_atomic_set(&imp->imp_refcount, 2);
976         cfs_atomic_set(&imp->imp_unregistering, 0);
977         cfs_atomic_set(&imp->imp_inflight, 0);
978         cfs_atomic_set(&imp->imp_replay_inflight, 0);
979         cfs_atomic_set(&imp->imp_inval_count, 0);
980         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
981         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
982         class_handle_hash(&imp->imp_handle, import_handle_addref);
983         init_imp_at(&imp->imp_at);
984
985         /* the default magic is V2, will be used in connect RPC, and
986          * then adjusted according to the flags in request/reply. */
987         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
988
989         return imp;
990 }
991 EXPORT_SYMBOL(class_new_import);
992
993 void class_destroy_import(struct obd_import *import)
994 {
995         LASSERT(import != NULL);
996         LASSERT(import != LP_POISON);
997
998         class_handle_unhash(&import->imp_handle);
999
1000         cfs_spin_lock(&import->imp_lock);
1001         import->imp_generation++;
1002         cfs_spin_unlock(&import->imp_lock);
1003         class_import_put(import);
1004 }
1005 EXPORT_SYMBOL(class_destroy_import);
1006
1007 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1008
1009 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1010 {
1011         cfs_spin_lock(&exp->exp_locks_list_guard);
1012
1013         LASSERT(lock->l_exp_refs_nr >= 0);
1014
1015         if (lock->l_exp_refs_target != NULL &&
1016             lock->l_exp_refs_target != exp) {
1017                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1018                               exp, lock, lock->l_exp_refs_target);
1019         }
1020         if ((lock->l_exp_refs_nr ++) == 0) {
1021                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1022                 lock->l_exp_refs_target = exp;
1023         }
1024         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1025                lock, exp, lock->l_exp_refs_nr);
1026         cfs_spin_unlock(&exp->exp_locks_list_guard);
1027 }
1028 EXPORT_SYMBOL(__class_export_add_lock_ref);
1029
1030 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1031 {
1032         cfs_spin_lock(&exp->exp_locks_list_guard);
1033         LASSERT(lock->l_exp_refs_nr > 0);
1034         if (lock->l_exp_refs_target != exp) {
1035                 LCONSOLE_WARN("lock %p, "
1036                               "mismatching export pointers: %p, %p\n",
1037                               lock, lock->l_exp_refs_target, exp);
1038         }
1039         if (-- lock->l_exp_refs_nr == 0) {
1040                 cfs_list_del_init(&lock->l_exp_refs_link);
1041                 lock->l_exp_refs_target = NULL;
1042         }
1043         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1044                lock, exp, lock->l_exp_refs_nr);
1045         cfs_spin_unlock(&exp->exp_locks_list_guard);
1046 }
1047 EXPORT_SYMBOL(__class_export_del_lock_ref);
1048 #endif
1049
1050 /* A connection defines an export context in which preallocation can
1051    be managed. This releases the export pointer reference, and returns
1052    the export handle, so the export refcount is 1 when this function
1053    returns. */
1054 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1055                   struct obd_uuid *cluuid)
1056 {
1057         struct obd_export *export;
1058         LASSERT(conn != NULL);
1059         LASSERT(obd != NULL);
1060         LASSERT(cluuid != NULL);
1061         ENTRY;
1062
1063         export = class_new_export(obd, cluuid);
1064         if (IS_ERR(export))
1065                 RETURN(PTR_ERR(export));
1066
1067         conn->cookie = export->exp_handle.h_cookie;
1068         class_export_put(export);
1069
1070         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1071                cluuid->uuid, conn->cookie);
1072         RETURN(0);
1073 }
1074 EXPORT_SYMBOL(class_connect);
1075
1076 /* if export is involved in recovery then clean up related things */
1077 void class_export_recovery_cleanup(struct obd_export *exp)
1078 {
1079         struct obd_device *obd = exp->exp_obd;
1080
1081         cfs_spin_lock(&obd->obd_recovery_task_lock);
1082         if (exp->exp_delayed)
1083                 obd->obd_delayed_clients--;
1084         if (obd->obd_recovering && exp->exp_in_recovery) {
1085                 cfs_spin_lock(&exp->exp_lock);
1086                 exp->exp_in_recovery = 0;
1087                 cfs_spin_unlock(&exp->exp_lock);
1088                 LASSERT(obd->obd_connected_clients);
1089                 obd->obd_connected_clients--;
1090         }
1091         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1092         /** Cleanup req replay fields */
1093         if (exp->exp_req_replay_needed) {
1094                 cfs_spin_lock(&exp->exp_lock);
1095                 exp->exp_req_replay_needed = 0;
1096                 cfs_spin_unlock(&exp->exp_lock);
1097                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1098                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1099         }
1100         /** Cleanup lock replay data */
1101         if (exp->exp_lock_replay_needed) {
1102                 cfs_spin_lock(&exp->exp_lock);
1103                 exp->exp_lock_replay_needed = 0;
1104                 cfs_spin_unlock(&exp->exp_lock);
1105                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1106                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1107         }
1108 }
1109
1110 /* This function removes 1-3 references from the export:
1111  * 1 - for export pointer passed
1112  * and if disconnect really need
1113  * 2 - removing from hash
1114  * 3 - in client_unlink_export
1115  * The export pointer passed to this function can destroyed */
1116 int class_disconnect(struct obd_export *export)
1117 {
1118         int already_disconnected;
1119         ENTRY;
1120
1121         if (export == NULL) {
1122                 fixme();
1123                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1124                 RETURN(-EINVAL);
1125         }
1126
1127         cfs_spin_lock(&export->exp_lock);
1128         already_disconnected = export->exp_disconnected;
1129         export->exp_disconnected = 1;
1130         cfs_spin_unlock(&export->exp_lock);
1131
1132         /* class_cleanup(), abort_recovery(), and class_fail_export()
1133          * all end up in here, and if any of them race we shouldn't
1134          * call extra class_export_puts(). */
1135         if (already_disconnected) {
1136                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1137                 GOTO(no_disconn, already_disconnected);
1138         }
1139
1140         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1141                export->exp_handle.h_cookie);
1142
1143         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1144                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1145                              &export->exp_connection->c_peer.nid,
1146                              &export->exp_nid_hash);
1147
1148         class_export_recovery_cleanup(export);
1149         class_unlink_export(export);
1150 no_disconn:
1151         class_export_put(export);
1152         RETURN(0);
1153 }
1154
1155 /* Return non-zero for a fully connected export */
1156 int class_connected_export(struct obd_export *exp)
1157 {
1158         if (exp) {
1159                 int connected;
1160                 cfs_spin_lock(&exp->exp_lock);
1161                 connected = (exp->exp_conn_cnt > 0);
1162                 cfs_spin_unlock(&exp->exp_lock);
1163                 return connected;
1164         }
1165         return 0;
1166 }
1167 EXPORT_SYMBOL(class_connected_export);
1168
1169 static void class_disconnect_export_list(cfs_list_t *list,
1170                                          enum obd_option flags)
1171 {
1172         int rc;
1173         struct obd_export *exp;
1174         ENTRY;
1175
1176         /* It's possible that an export may disconnect itself, but
1177          * nothing else will be added to this list. */
1178         while (!cfs_list_empty(list)) {
1179                 exp = cfs_list_entry(list->next, struct obd_export,
1180                                      exp_obd_chain);
1181                 /* need for safe call CDEBUG after obd_disconnect */
1182                 class_export_get(exp);
1183
1184                 cfs_spin_lock(&exp->exp_lock);
1185                 exp->exp_flags = flags;
1186                 cfs_spin_unlock(&exp->exp_lock);
1187
1188                 if (obd_uuid_equals(&exp->exp_client_uuid,
1189                                     &exp->exp_obd->obd_uuid)) {
1190                         CDEBUG(D_HA,
1191                                "exp %p export uuid == obd uuid, don't discon\n",
1192                                exp);
1193                         /* Need to delete this now so we don't end up pointing
1194                          * to work_list later when this export is cleaned up. */
1195                         cfs_list_del_init(&exp->exp_obd_chain);
1196                         class_export_put(exp);
1197                         continue;
1198                 }
1199
1200                 class_export_get(exp);
1201                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1202                        "last request at "CFS_TIME_T"\n",
1203                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1204                        exp, exp->exp_last_request_time);
1205                 /* release one export reference anyway */
1206                 rc = obd_disconnect(exp);
1207
1208                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1209                        obd_export_nid2str(exp), exp, rc);
1210                 class_export_put(exp);
1211         }
1212         EXIT;
1213 }
1214
1215 void class_disconnect_exports(struct obd_device *obd)
1216 {
1217         cfs_list_t work_list;
1218         ENTRY;
1219
1220         /* Move all of the exports from obd_exports to a work list, en masse. */
1221         CFS_INIT_LIST_HEAD(&work_list);
1222         cfs_spin_lock(&obd->obd_dev_lock);
1223         cfs_list_splice_init(&obd->obd_exports, &work_list);
1224         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1225         cfs_spin_unlock(&obd->obd_dev_lock);
1226
1227         if (!cfs_list_empty(&work_list)) {
1228                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1229                        "disconnecting them\n", obd->obd_minor, obd);
1230                 class_disconnect_export_list(&work_list,
1231                                              exp_flags_from_obd(obd));
1232         } else
1233                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1234                        obd->obd_minor, obd);
1235         EXIT;
1236 }
1237 EXPORT_SYMBOL(class_disconnect_exports);
1238
1239 /* Remove exports that have not completed recovery.
1240  */
1241 void class_disconnect_stale_exports(struct obd_device *obd,
1242                                     int (*test_export)(struct obd_export *))
1243 {
1244         cfs_list_t work_list;
1245         cfs_list_t *pos, *n;
1246         struct obd_export *exp;
1247         int evicted = 0;
1248         ENTRY;
1249
1250         CFS_INIT_LIST_HEAD(&work_list);
1251         cfs_spin_lock(&obd->obd_dev_lock);
1252         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1253                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1254                 if (test_export(exp))
1255                         continue;
1256
1257                 /* don't count self-export as client */
1258                 if (obd_uuid_equals(&exp->exp_client_uuid,
1259                                     &exp->exp_obd->obd_uuid))
1260                         continue;
1261
1262                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1263                 evicted++;
1264                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1265                        obd->obd_name, exp->exp_client_uuid.uuid,
1266                        exp->exp_connection == NULL ? "<unknown>" :
1267                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1268                 print_export_data(exp, "EVICTING", 0);
1269         }
1270         cfs_spin_unlock(&obd->obd_dev_lock);
1271
1272         if (evicted) {
1273                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1274                        obd->obd_name, evicted);
1275                 obd->obd_stale_clients += evicted;
1276         }
1277         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1278                                                  OBD_OPT_ABORT_RECOV);
1279         EXIT;
1280 }
1281 EXPORT_SYMBOL(class_disconnect_stale_exports);
1282
1283 void class_fail_export(struct obd_export *exp)
1284 {
1285         int rc, already_failed;
1286
1287         cfs_spin_lock(&exp->exp_lock);
1288         already_failed = exp->exp_failed;
1289         exp->exp_failed = 1;
1290         cfs_spin_unlock(&exp->exp_lock);
1291
1292         if (already_failed) {
1293                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1294                        exp, exp->exp_client_uuid.uuid);
1295                 return;
1296         }
1297
1298         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1299                exp, exp->exp_client_uuid.uuid);
1300
1301         if (obd_dump_on_timeout)
1302                 libcfs_debug_dumplog();
1303
1304         /* Most callers into obd_disconnect are removing their own reference
1305          * (request, for example) in addition to the one from the hash table.
1306          * We don't have such a reference here, so make one. */
1307         class_export_get(exp);
1308         rc = obd_disconnect(exp);
1309         if (rc)
1310                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1311         else
1312                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1313                        exp, exp->exp_client_uuid.uuid);
1314 }
1315 EXPORT_SYMBOL(class_fail_export);
1316
1317 char *obd_export_nid2str(struct obd_export *exp)
1318 {
1319         if (exp->exp_connection != NULL)
1320                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1321
1322         return "(no nid)";
1323 }
1324 EXPORT_SYMBOL(obd_export_nid2str);
1325
1326 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1327 {
1328         struct obd_export *doomed_exp = NULL;
1329         int exports_evicted = 0;
1330
1331         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1332
1333         do {
1334                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1335                 if (doomed_exp == NULL)
1336                         break;
1337
1338                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1339                          "nid %s found, wanted nid %s, requested nid %s\n",
1340                          obd_export_nid2str(doomed_exp),
1341                          libcfs_nid2str(nid_key), nid);
1342                 LASSERTF(doomed_exp != obd->obd_self_export,
1343                          "self-export is hashed by NID?\n");
1344                 exports_evicted++;
1345                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1346                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1347                        exports_evicted);
1348                 class_fail_export(doomed_exp);
1349                 class_export_put(doomed_exp);
1350         } while (1);
1351
1352         if (!exports_evicted)
1353                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1354                        obd->obd_name, nid);
1355         return exports_evicted;
1356 }
1357 EXPORT_SYMBOL(obd_export_evict_by_nid);
1358
1359 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1360 {
1361         struct obd_export *doomed_exp = NULL;
1362         struct obd_uuid doomed_uuid;
1363         int exports_evicted = 0;
1364
1365         obd_str2uuid(&doomed_uuid, uuid);
1366         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1367                 CERROR("%s: can't evict myself\n", obd->obd_name);
1368                 return exports_evicted;
1369         }
1370
1371         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1372
1373         if (doomed_exp == NULL) {
1374                 CERROR("%s: can't disconnect %s: no exports found\n",
1375                        obd->obd_name, uuid);
1376         } else {
1377                 CWARN("%s: evicting %s at adminstrative request\n",
1378                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1379                 class_fail_export(doomed_exp);
1380                 class_export_put(doomed_exp);
1381                 exports_evicted++;
1382         }
1383
1384         return exports_evicted;
1385 }
1386 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1387
1388 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1389 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1390 EXPORT_SYMBOL(class_export_dump_hook);
1391 #endif
1392
1393 static void print_export_data(struct obd_export *exp, const char *status,
1394                               int locks)
1395 {
1396         struct ptlrpc_reply_state *rs;
1397         struct ptlrpc_reply_state *first_reply = NULL;
1398         int nreplies = 0;
1399
1400         cfs_spin_lock(&exp->exp_lock);
1401         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1402                                 rs_exp_list) {
1403                 if (nreplies == 0)
1404                         first_reply = rs;
1405                 nreplies++;
1406         }
1407         cfs_spin_unlock(&exp->exp_lock);
1408
1409         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1410                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1411                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1412                cfs_atomic_read(&exp->exp_rpc_count),
1413                cfs_atomic_read(&exp->exp_cb_count),
1414                cfs_atomic_read(&exp->exp_locks_count),
1415                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1416                nreplies, first_reply, nreplies > 3 ? "..." : "",
1417                exp->exp_last_committed);
1418 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1419         if (locks && class_export_dump_hook != NULL)
1420                 class_export_dump_hook(exp);
1421 #endif
1422 }
1423
1424 void dump_exports(struct obd_device *obd, int locks)
1425 {
1426         struct obd_export *exp;
1427
1428         cfs_spin_lock(&obd->obd_dev_lock);
1429         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1430                 print_export_data(exp, "ACTIVE", locks);
1431         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1432                 print_export_data(exp, "UNLINKED", locks);
1433         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1434                 print_export_data(exp, "DELAYED", locks);
1435         cfs_spin_unlock(&obd->obd_dev_lock);
1436         cfs_spin_lock(&obd_zombie_impexp_lock);
1437         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1438                 print_export_data(exp, "ZOMBIE", locks);
1439         cfs_spin_unlock(&obd_zombie_impexp_lock);
1440 }
1441 EXPORT_SYMBOL(dump_exports);
1442
1443 void obd_exports_barrier(struct obd_device *obd)
1444 {
1445         int waited = 2;
1446         LASSERT(cfs_list_empty(&obd->obd_exports));
1447         cfs_spin_lock(&obd->obd_dev_lock);
1448         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1449                 cfs_spin_unlock(&obd->obd_dev_lock);
1450                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1451                                                    cfs_time_seconds(waited));
1452                 if (waited > 5 && IS_PO2(waited)) {
1453                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1454                                       "more than %d seconds. "
1455                                       "The obd refcount = %d. Is it stuck?\n",
1456                                       obd->obd_name, waited,
1457                                       cfs_atomic_read(&obd->obd_refcount));
1458                         dump_exports(obd, 1);
1459                 }
1460                 waited *= 2;
1461                 cfs_spin_lock(&obd->obd_dev_lock);
1462         }
1463         cfs_spin_unlock(&obd->obd_dev_lock);
1464 }
1465 EXPORT_SYMBOL(obd_exports_barrier);
1466
1467 /* Total amount of zombies to be destroyed */
1468 static int zombies_count = 0;
1469
1470 /**
1471  * kill zombie imports and exports
1472  */
1473 void obd_zombie_impexp_cull(void)
1474 {
1475         struct obd_import *import;
1476         struct obd_export *export;
1477         ENTRY;
1478
1479         do {
1480                 cfs_spin_lock(&obd_zombie_impexp_lock);
1481
1482                 import = NULL;
1483                 if (!cfs_list_empty(&obd_zombie_imports)) {
1484                         import = cfs_list_entry(obd_zombie_imports.next,
1485                                                 struct obd_import,
1486                                                 imp_zombie_chain);
1487                         cfs_list_del_init(&import->imp_zombie_chain);
1488                 }
1489
1490                 export = NULL;
1491                 if (!cfs_list_empty(&obd_zombie_exports)) {
1492                         export = cfs_list_entry(obd_zombie_exports.next,
1493                                                 struct obd_export,
1494                                                 exp_obd_chain);
1495                         cfs_list_del_init(&export->exp_obd_chain);
1496                 }
1497
1498                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1499
1500                 if (import != NULL) {
1501                         class_import_destroy(import);
1502                         cfs_spin_lock(&obd_zombie_impexp_lock);
1503                         zombies_count--;
1504                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1505                 }
1506
1507                 if (export != NULL) {
1508                         class_export_destroy(export);
1509                         cfs_spin_lock(&obd_zombie_impexp_lock);
1510                         zombies_count--;
1511                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1512                 }
1513
1514                 cfs_cond_resched();
1515         } while (import != NULL || export != NULL);
1516         EXIT;
1517 }
1518
1519 static cfs_completion_t         obd_zombie_start;
1520 static cfs_completion_t         obd_zombie_stop;
1521 static unsigned long            obd_zombie_flags;
1522 static cfs_waitq_t              obd_zombie_waitq;
1523 static pid_t                    obd_zombie_pid;
1524
1525 enum {
1526         OBD_ZOMBIE_STOP   = 1 << 1
1527 };
1528
1529 /**
1530  * check for work for kill zombie import/export thread.
1531  */
1532 static int obd_zombie_impexp_check(void *arg)
1533 {
1534         int rc;
1535
1536         cfs_spin_lock(&obd_zombie_impexp_lock);
1537         rc = (zombies_count == 0) &&
1538              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1539         cfs_spin_unlock(&obd_zombie_impexp_lock);
1540
1541         RETURN(rc);
1542 }
1543
1544 /**
1545  * Add export to the obd_zombe thread and notify it.
1546  */
1547 static void obd_zombie_export_add(struct obd_export *exp) {
1548         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1549         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1550         cfs_list_del_init(&exp->exp_obd_chain);
1551         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1552         cfs_spin_lock(&obd_zombie_impexp_lock);
1553         zombies_count++;
1554         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1555         cfs_spin_unlock(&obd_zombie_impexp_lock);
1556
1557         if (obd_zombie_impexp_notify != NULL)
1558                 obd_zombie_impexp_notify();
1559 }
1560
1561 /**
1562  * Add import to the obd_zombe thread and notify it.
1563  */
1564 static void obd_zombie_import_add(struct obd_import *imp) {
1565         LASSERT(imp->imp_sec == NULL);
1566         cfs_spin_lock(&obd_zombie_impexp_lock);
1567         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1568         zombies_count++;
1569         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1570         cfs_spin_unlock(&obd_zombie_impexp_lock);
1571
1572         if (obd_zombie_impexp_notify != NULL)
1573                 obd_zombie_impexp_notify();
1574 }
1575
1576 /**
1577  * notify import/export destroy thread about new zombie.
1578  */
1579 static void obd_zombie_impexp_notify(void)
1580 {
1581         cfs_waitq_signal(&obd_zombie_waitq);
1582 }
1583
1584 /**
1585  * check whether obd_zombie is idle
1586  */
1587 static int obd_zombie_is_idle(void)
1588 {
1589         int rc;
1590
1591         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1592         cfs_spin_lock(&obd_zombie_impexp_lock);
1593         rc = (zombies_count == 0);
1594         cfs_spin_unlock(&obd_zombie_impexp_lock);
1595         return rc;
1596 }
1597
1598 /**
1599  * wait when obd_zombie import/export queues become empty
1600  */
1601 void obd_zombie_barrier(void)
1602 {
1603         struct l_wait_info lwi = { 0 };
1604
1605         if (obd_zombie_pid == cfs_curproc_pid())
1606                 /* don't wait for myself */
1607                 return;
1608         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1609 }
1610 EXPORT_SYMBOL(obd_zombie_barrier);
1611
1612 #ifdef __KERNEL__
1613
1614 /**
1615  * destroy zombie export/import thread.
1616  */
1617 static int obd_zombie_impexp_thread(void *unused)
1618 {
1619         int rc;
1620
1621         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1622                 cfs_complete(&obd_zombie_start);
1623                 RETURN(rc);
1624         }
1625
1626         cfs_complete(&obd_zombie_start);
1627
1628         obd_zombie_pid = cfs_curproc_pid();
1629
1630         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1631                 struct l_wait_info lwi = { 0 };
1632
1633                 l_wait_event(obd_zombie_waitq,
1634                              !obd_zombie_impexp_check(NULL), &lwi);
1635                 obd_zombie_impexp_cull();
1636
1637                 /*
1638                  * Notify obd_zombie_barrier callers that queues
1639                  * may be empty.
1640                  */
1641                 cfs_waitq_signal(&obd_zombie_waitq);
1642         }
1643
1644         cfs_complete(&obd_zombie_stop);
1645
1646         RETURN(0);
1647 }
1648
1649 #else /* ! KERNEL */
1650
1651 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1652 static void *obd_zombie_impexp_work_cb;
1653 static void *obd_zombie_impexp_idle_cb;
1654
1655 int obd_zombie_impexp_kill(void *arg)
1656 {
1657         int rc = 0;
1658
1659         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1660                 obd_zombie_impexp_cull();
1661                 rc = 1;
1662         }
1663         cfs_atomic_dec(&zombie_recur);
1664         return rc;
1665 }
1666
1667 #endif
1668
1669 /**
1670  * start destroy zombie import/export thread
1671  */
1672 int obd_zombie_impexp_init(void)
1673 {
1674         int rc;
1675
1676         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1677         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1678         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1679         cfs_init_completion(&obd_zombie_start);
1680         cfs_init_completion(&obd_zombie_stop);
1681         cfs_waitq_init(&obd_zombie_waitq);
1682         obd_zombie_pid = 0;
1683
1684 #ifdef __KERNEL__
1685         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1686         if (rc < 0)
1687                 RETURN(rc);
1688
1689         cfs_wait_for_completion(&obd_zombie_start);
1690 #else
1691
1692         obd_zombie_impexp_work_cb =
1693                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1694                                                  &obd_zombie_impexp_kill, NULL);
1695
1696         obd_zombie_impexp_idle_cb =
1697                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1698                                                  &obd_zombie_impexp_check, NULL);
1699         rc = 0;
1700 #endif
1701         RETURN(rc);
1702 }
1703 /**
1704  * stop destroy zombie import/export thread
1705  */
1706 void obd_zombie_impexp_stop(void)
1707 {
1708         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1709         obd_zombie_impexp_notify();
1710 #ifdef __KERNEL__
1711         cfs_wait_for_completion(&obd_zombie_stop);
1712 #else
1713         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1714         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1715 #endif
1716 }
1717
1718 /***** Kernel-userspace comm helpers *******/
1719
1720 /* Get length of entire message, including header */
1721 int kuc_len(int payload_len)
1722 {
1723         return sizeof(struct kuc_hdr) + payload_len;
1724 }
1725 EXPORT_SYMBOL(kuc_len);
1726
1727 /* Get a pointer to kuc header, given a ptr to the payload
1728  * @param p Pointer to payload area
1729  * @returns Pointer to kuc header
1730  */
1731 struct kuc_hdr * kuc_ptr(void *p)
1732 {
1733         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1734         LASSERT(lh->kuc_magic == KUC_MAGIC);
1735         return lh;
1736 }
1737 EXPORT_SYMBOL(kuc_ptr);
1738
1739 /* Test if payload is part of kuc message
1740  * @param p Pointer to payload area
1741  * @returns boolean
1742  */
1743 int kuc_ispayload(void *p)
1744 {
1745         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1746
1747         if (kh->kuc_magic == KUC_MAGIC)
1748                 return 1;
1749         else
1750                 return 0;
1751 }
1752 EXPORT_SYMBOL(kuc_ispayload);
1753
1754 /* Alloc space for a message, and fill in header
1755  * @return Pointer to payload area
1756  */
1757 void *kuc_alloc(int payload_len, int transport, int type)
1758 {
1759         struct kuc_hdr *lh;
1760         int len = kuc_len(payload_len);
1761
1762         OBD_ALLOC(lh, len);
1763         if (lh == NULL)
1764                 return ERR_PTR(-ENOMEM);
1765
1766         lh->kuc_magic = KUC_MAGIC;
1767         lh->kuc_transport = transport;
1768         lh->kuc_msgtype = type;
1769         lh->kuc_msglen = len;
1770
1771         return (void *)(lh + 1);
1772 }
1773 EXPORT_SYMBOL(kuc_alloc);
1774
1775 /* Takes pointer to payload area */
1776 inline void kuc_free(void *p, int payload_len)
1777 {
1778         struct kuc_hdr *lh = kuc_ptr(p);
1779         OBD_FREE(lh, kuc_len(payload_len));
1780 }
1781 EXPORT_SYMBOL(kuc_free);
1782
1783
1784