Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82 EXPORT_SYMBOL(obd_device_alloc);
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef CONFIG_KMOD
120         if (!type) {
121                 const char *modname = name;
122                 if (!request_module(modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 try_module_get(type->typ_dt_ops->o_owner);
135                 spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139
140 void class_put_type(struct obd_type *type)
141 {
142         LASSERT(type);
143         spin_lock(&type->obd_type_lock);
144         type->typ_refcnt--;
145         module_put(type->typ_dt_ops->o_owner);
146         spin_unlock(&type->obd_type_lock);
147 }
148
149 #define CLASS_MAX_NAME 1024
150
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152                         struct lprocfs_vars *vars, const char *name,
153                         struct lu_device_type *ldt)
154 {
155         struct obd_type *type;
156         int rc = 0;
157         ENTRY;
158
159         /* sanity check */
160         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
161
162         if (class_search_type(name)) {
163                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
164                 RETURN(-EEXIST);
165         }
166
167         rc = -ENOMEM;
168         OBD_ALLOC(type, sizeof(*type));
169         if (type == NULL)
170                 RETURN(rc);
171
172         OBD_ALLOC_PTR(type->typ_dt_ops);
173         OBD_ALLOC_PTR(type->typ_md_ops);
174         OBD_ALLOC(type->typ_name, strlen(name) + 1);
175
176         if (type->typ_dt_ops == NULL ||
177             type->typ_md_ops == NULL ||
178             type->typ_name == NULL)
179                 GOTO (failed, rc);
180
181         *(type->typ_dt_ops) = *dt_ops;
182         /* md_ops is optional */
183         if (md_ops)
184                 *(type->typ_md_ops) = *md_ops;
185         strcpy(type->typ_name, name);
186         spin_lock_init(&type->obd_type_lock);
187
188 #ifdef LPROCFS
189         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
190                                               vars, type);
191         if (IS_ERR(type->typ_procroot)) {
192                 rc = PTR_ERR(type->typ_procroot);
193                 type->typ_procroot = NULL;
194                 GOTO (failed, rc);
195         }
196 #endif
197         if (ldt != NULL) {
198                 type->typ_lu = ldt;
199                 rc = lu_device_type_init(ldt);
200                 if (rc != 0)
201                         GOTO (failed, rc);
202         }
203
204         spin_lock(&obd_types_lock);
205         list_add(&type->typ_chain, &obd_types);
206         spin_unlock(&obd_types_lock);
207
208         RETURN (0);
209
210  failed:
211         if (type->typ_name != NULL)
212                 OBD_FREE(type->typ_name, strlen(name) + 1);
213         if (type->typ_md_ops != NULL)
214                 OBD_FREE_PTR(type->typ_md_ops);
215         if (type->typ_dt_ops != NULL)
216                 OBD_FREE_PTR(type->typ_dt_ops);
217         OBD_FREE(type, sizeof(*type));
218         RETURN(rc);
219 }
220
221 int class_unregister_type(const char *name)
222 {
223         struct obd_type *type = class_search_type(name);
224         ENTRY;
225
226         if (!type) {
227                 CERROR("unknown obd type\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (type->typ_refcnt) {
232                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233                 /* This is a bad situation, let's make the best of it */
234                 /* Remove ops, but leave the name for debugging */
235                 OBD_FREE_PTR(type->typ_dt_ops);
236                 OBD_FREE_PTR(type->typ_md_ops);
237                 RETURN(-EBUSY);
238         }
239
240         if (type->typ_procroot) {
241                 lprocfs_remove(&type->typ_procroot);
242         }
243
244         if (type->typ_lu)
245                 lu_device_type_fini(type->typ_lu);
246
247         spin_lock(&obd_types_lock);
248         list_del(&type->typ_chain);
249         spin_unlock(&obd_types_lock);
250         OBD_FREE(type->typ_name, strlen(name) + 1);
251         if (type->typ_dt_ops != NULL)
252                 OBD_FREE_PTR(type->typ_dt_ops);
253         if (type->typ_md_ops != NULL)
254                 OBD_FREE_PTR(type->typ_md_ops);
255         OBD_FREE(type, sizeof(*type));
256         RETURN(0);
257 } /* class_unregister_type */
258
259 /**
260  * Create a new obd device.
261  *
262  * Find an empty slot in ::obd_devs[], create a new obd device in it.
263  *
264  * \param typename [in] obd device type string.
265  * \param name     [in] obd device name.
266  *
267  * \retval NULL if create fails, otherwise return the obd device
268  *         pointer created.
269  */
270 struct obd_device *class_newdev(const char *type_name, const char *name)
271 {
272         struct obd_device *result = NULL;
273         struct obd_device *newdev;
274         struct obd_type *type = NULL;
275         int i;
276         int new_obd_minor = 0;
277
278         if (strlen(name) >= MAX_OBD_NAME) {
279                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280                 RETURN(ERR_PTR(-EINVAL));
281         }
282
283         type = class_get_type(type_name);
284         if (type == NULL){
285                 CERROR("OBD: unknown type: %s\n", type_name);
286                 RETURN(ERR_PTR(-ENODEV));
287         }
288
289         newdev = obd_device_alloc();
290         if (newdev == NULL) {
291                 class_put_type(type);
292                 RETURN(ERR_PTR(-ENOMEM));
293         }
294         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
295
296         spin_lock(&obd_dev_lock);
297         for (i = 0; i < class_devno_max(); i++) {
298                 struct obd_device *obd = class_num2obd(i);
299                 if (obd && obd->obd_name &&
300                     (strcmp(name, obd->obd_name) == 0)) {
301                         CERROR("Device %s already exists, won't add\n", name);
302                         if (result) {
303                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304                                          "%p obd_magic %08x != %08x\n", result,
305                                          result->obd_magic, OBD_DEVICE_MAGIC);
306                                 LASSERTF(result->obd_minor == new_obd_minor,
307                                          "%p obd_minor %d != %d\n", result,
308                                          result->obd_minor, new_obd_minor);
309
310                                 obd_devs[result->obd_minor] = NULL;
311                                 result->obd_name[0]='\0';
312                          }
313                         result = ERR_PTR(-EEXIST);
314                         break;
315                 }
316                 if (!result && !obd) {
317                         result = newdev;
318                         result->obd_minor = i;
319                         new_obd_minor = i;
320                         result->obd_type = type;
321                         strncpy(result->obd_name, name,
322                                 sizeof(result->obd_name) - 1);
323                         obd_devs[i] = result;
324                 }
325         }
326         spin_unlock(&obd_dev_lock);
327
328         if (result == NULL && i >= class_devno_max()) {
329                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
330                        class_devno_max());
331                 result = ERR_PTR(-EOVERFLOW);
332         }
333
334         if (IS_ERR(result)) {
335                 obd_device_free(newdev);
336                 class_put_type(type);
337         } else {
338                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339                        result->obd_name, result);
340         }
341         return result;
342 }
343
344 void class_release_dev(struct obd_device *obd)
345 {
346         struct obd_type *obd_type = obd->obd_type;
347
348         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352         LASSERT(obd_type != NULL);
353
354         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355                obd->obd_name,obd->obd_type->typ_name);
356
357         spin_lock(&obd_dev_lock);
358         obd_devs[obd->obd_minor] = NULL;
359         spin_unlock(&obd_dev_lock);
360         obd_device_free(obd);
361
362         class_put_type(obd_type);
363 }
364
365 int class_name2dev(const char *name)
366 {
367         int i;
368
369         if (!name)
370                 return -1;
371
372         spin_lock(&obd_dev_lock);
373         for (i = 0; i < class_devno_max(); i++) {
374                 struct obd_device *obd = class_num2obd(i);
375                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376                         /* Make sure we finished attaching before we give
377                            out any references */
378                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379                         if (obd->obd_attached) {
380                                 spin_unlock(&obd_dev_lock);
381                                 return i;
382                         }
383                         break;
384                 }
385         }
386         spin_unlock(&obd_dev_lock);
387
388         return -1;
389 }
390
391 struct obd_device *class_name2obd(const char *name)
392 {
393         int dev = class_name2dev(name);
394
395         if (dev < 0 || dev > class_devno_max())
396                 return NULL;
397         return class_num2obd(dev);
398 }
399
400 int class_uuid2dev(struct obd_uuid *uuid)
401 {
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409                         spin_unlock(&obd_dev_lock);
410                         return i;
411                 }
412         }
413         spin_unlock(&obd_dev_lock);
414
415         return -1;
416 }
417
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
419 {
420         int dev = class_uuid2dev(uuid);
421         if (dev < 0)
422                 return NULL;
423         return class_num2obd(dev);
424 }
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *         otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (obd == NULL)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453
454 void class_obd_list(void)
455 {
456         char *status;
457         int i;
458
459         spin_lock(&obd_dev_lock);
460         for (i = 0; i < class_devno_max(); i++) {
461                 struct obd_device *obd = class_num2obd(i);
462                 if (obd == NULL)
463                         continue;
464                 if (obd->obd_stopping)
465                         status = "ST";
466                 else if (obd->obd_set_up)
467                         status = "UP";
468                 else if (obd->obd_attached)
469                         status = "AT";
470                 else
471                         status = "--";
472                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473                          i, status, obd->obd_type->typ_name,
474                          obd->obd_name, obd->obd_uuid.uuid,
475                          atomic_read(&obd->obd_refcount));
476         }
477         spin_unlock(&obd_dev_lock);
478         return;
479 }
480
481 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
482    specified, then only the client with that uuid is returned,
483    otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485                                           const char * typ_name,
486                                           struct obd_uuid *grp_uuid)
487 {
488         int i;
489
490         spin_lock(&obd_dev_lock);
491         for (i = 0; i < class_devno_max(); i++) {
492                 struct obd_device *obd = class_num2obd(i);
493                 if (obd == NULL)
494                         continue;
495                 if ((strncmp(obd->obd_type->typ_name, typ_name,
496                              strlen(typ_name)) == 0)) {
497                         if (obd_uuid_equals(tgt_uuid,
498                                             &obd->u.cli.cl_target_uuid) &&
499                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
500                                                          &obd->obd_uuid) : 1)) {
501                                 spin_unlock(&obd_dev_lock);
502                                 return obd;
503                         }
504                 }
505         }
506         spin_unlock(&obd_dev_lock);
507
508         return NULL;
509 }
510
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512    searching at *next, and if a device is found, the next index to look
513    at is saved in *next. If next is NULL, then the first matching device
514    will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
516 {
517         int i;
518
519         if (next == NULL)
520                 i = 0;
521         else if (*next >= 0 && *next < class_devno_max())
522                 i = *next;
523         else
524                 return NULL;
525
526         spin_lock(&obd_dev_lock);
527         for (; i < class_devno_max(); i++) {
528                 struct obd_device *obd = class_num2obd(i);
529                 if (obd == NULL)
530                         continue;
531                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532                         if (next != NULL)
533                                 *next = i+1;
534                         spin_unlock(&obd_dev_lock);
535                         return obd;
536                 }
537         }
538         spin_unlock(&obd_dev_lock);
539
540         return NULL;
541 }
542
543 /**
544  * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545  * adjust sptlrpc settings accordingly.
546  */
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
548 {
549         struct obd_device  *obd;
550         const char         *type;
551         int                 i, rc = 0, rc2;
552
553         LASSERT(namelen > 0);
554
555         spin_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 obd = class_num2obd(i);
558
559                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560                         continue;
561
562                 /* only notify mdc, osc, mdt, ost */
563                 type = obd->obd_type->typ_name;
564                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OST_NAME) != 0)
568                         continue;
569
570                 if (strncmp(obd->obd_name, fsname, namelen))
571                         continue;
572
573                 class_incref(obd, __FUNCTION__, obd);
574                 spin_unlock(&obd_dev_lock);
575                 rc2 = obd_set_info_async(obd->obd_self_export,
576                                          sizeof(KEY_SPTLRPC_CONF),
577                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
578                 rc = rc ? rc : rc2;
579                 class_decref(obd, __FUNCTION__, obd);
580                 spin_lock(&obd_dev_lock);
581         }
582         spin_unlock(&obd_dev_lock);
583         return rc;
584 }
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
586
587 void obd_cleanup_caches(void)
588 {
589         int rc;
590
591         ENTRY;
592         if (obd_device_cachep) {
593                 rc = cfs_mem_cache_destroy(obd_device_cachep);
594                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595                 obd_device_cachep = NULL;
596         }
597         if (obdo_cachep) {
598                 rc = cfs_mem_cache_destroy(obdo_cachep);
599                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
600                 obdo_cachep = NULL;
601         }
602         if (import_cachep) {
603                 rc = cfs_mem_cache_destroy(import_cachep);
604                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605                 import_cachep = NULL;
606         }
607         if (capa_cachep) {
608                 rc = cfs_mem_cache_destroy(capa_cachep);
609                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
610                 capa_cachep = NULL;
611         }
612         EXIT;
613 }
614
615 int obd_init_caches(void)
616 {
617         ENTRY;
618
619         LASSERT(obd_device_cachep == NULL);
620         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621                                                  sizeof(struct obd_device),
622                                                  0, 0);
623         if (!obd_device_cachep)
624                 GOTO(out, -ENOMEM);
625
626         LASSERT(obdo_cachep == NULL);
627         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
628                                            0, 0);
629         if (!obdo_cachep)
630                 GOTO(out, -ENOMEM);
631
632         LASSERT(import_cachep == NULL);
633         import_cachep = cfs_mem_cache_create("ll_import_cache",
634                                              sizeof(struct obd_import),
635                                              0, 0);
636         if (!import_cachep)
637                 GOTO(out, -ENOMEM);
638
639         LASSERT(capa_cachep == NULL);
640         capa_cachep = cfs_mem_cache_create("capa_cache",
641                                            sizeof(struct obd_capa), 0, 0);
642         if (!capa_cachep)
643                 GOTO(out, -ENOMEM);
644
645         RETURN(0);
646  out:
647         obd_cleanup_caches();
648         RETURN(-ENOMEM);
649
650 }
651
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
654 {
655         struct obd_export *export;
656         ENTRY;
657
658         if (!conn) {
659                 CDEBUG(D_CACHE, "looking for null handle\n");
660                 RETURN(NULL);
661         }
662
663         if (conn->cookie == -1) {  /* this means assign a new connection */
664                 CDEBUG(D_CACHE, "want a new connection\n");
665                 RETURN(NULL);
666         }
667
668         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669         export = class_handle2object(conn->cookie);
670         RETURN(export);
671 }
672
673 struct obd_device *class_exp2obd(struct obd_export *exp)
674 {
675         if (exp)
676                 return exp->exp_obd;
677         return NULL;
678 }
679
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
681 {
682         struct obd_export *export;
683         export = class_conn2export(conn);
684         if (export) {
685                 struct obd_device *obd = export->exp_obd;
686                 class_export_put(export);
687                 return obd;
688         }
689         return NULL;
690 }
691
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
693 {
694         struct obd_device *obd = exp->exp_obd;
695         if (obd == NULL)
696                 return NULL;
697         return obd->u.cli.cl_import;
698 }
699
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
701 {
702         struct obd_device *obd = class_conn2obd(conn);
703         if (obd == NULL)
704                 return NULL;
705         return obd->u.cli.cl_import;
706 }
707
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
710 {
711         struct obd_device *obd = exp->exp_obd;
712         ENTRY;
713
714         LASSERT (atomic_read(&exp->exp_refcount) == 0);
715
716         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717                exp->exp_client_uuid.uuid, obd->obd_name);
718
719         LASSERT(obd != NULL);
720
721         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722         if (exp->exp_connection)
723                 ptlrpc_put_connection_superhack(exp->exp_connection);
724
725         LASSERT(list_empty(&exp->exp_outstanding_replies));
726         LASSERT(list_empty(&exp->exp_uncommitted_replies));
727         LASSERT(list_empty(&exp->exp_req_replay_queue));
728         LASSERT(list_empty(&exp->exp_queued_rpc));
729         obd_destroy_export(exp);
730         class_decref(obd, "export", exp);
731
732         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
733         EXIT;
734 }
735
736 static void export_handle_addref(void *export)
737 {
738         class_export_get(export);
739 }
740
741 struct obd_export *class_export_get(struct obd_export *exp)
742 {
743         atomic_inc(&exp->exp_refcount);
744         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745                atomic_read(&exp->exp_refcount));
746         return exp;
747 }
748 EXPORT_SYMBOL(class_export_get);
749
750 void class_export_put(struct obd_export *exp)
751 {
752         LASSERT(exp != NULL);
753         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754                atomic_read(&exp->exp_refcount) - 1);
755         LASSERT(atomic_read(&exp->exp_refcount) > 0);
756         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
757
758         if (atomic_dec_and_test(&exp->exp_refcount)) {
759                 LASSERT(!list_empty(&exp->exp_obd_chain));
760                 CDEBUG(D_IOCTL, "final put %p/%s\n",
761                        exp, exp->exp_client_uuid.uuid);
762                 obd_zombie_export_add(exp);
763         }
764 }
765 EXPORT_SYMBOL(class_export_put);
766
767 /* Creates a new export, adds it to the hash table, and returns a
768  * pointer to it. The refcount is 2: one for the hash reference, and
769  * one for the pointer returned by this function. */
770 struct obd_export *class_new_export(struct obd_device *obd,
771                                     struct obd_uuid *cluuid)
772 {
773         struct obd_export *export;
774         int rc = 0;
775
776         OBD_ALLOC_PTR(export);
777         if (!export)
778                 return ERR_PTR(-ENOMEM);
779
780         export->exp_conn_cnt = 0;
781         export->exp_lock_hash = NULL;
782         atomic_set(&export->exp_refcount, 2);
783         atomic_set(&export->exp_rpc_count, 0);
784         atomic_set(&export->exp_cb_count, 0);
785         atomic_set(&export->exp_locks_count, 0);
786         export->exp_obd = obd;
787         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
788         spin_lock_init(&export->exp_uncommitted_replies_lock);
789         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
790         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
791         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
792         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
793         class_handle_hash(&export->exp_handle, export_handle_addref);
794         export->exp_last_request_time = cfs_time_current_sec();
795         spin_lock_init(&export->exp_lock);
796         INIT_HLIST_NODE(&export->exp_uuid_hash);
797         INIT_HLIST_NODE(&export->exp_nid_hash);
798
799         export->exp_sp_peer = LUSTRE_SP_ANY;
800         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
801         export->exp_client_uuid = *cluuid;
802         obd_init_export(export);
803
804         spin_lock(&obd->obd_dev_lock);
805         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
806                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
807                                             &export->exp_uuid_hash);
808                 if (rc != 0) {
809                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
810                                       obd->obd_name, cluuid->uuid, rc);
811                         spin_unlock(&obd->obd_dev_lock);
812                         class_handle_unhash(&export->exp_handle);
813                         OBD_FREE_PTR(export);
814                         return ERR_PTR(-EALREADY);
815                 }
816         }
817
818         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
819         class_incref(obd, "export", export);
820         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
821         list_add_tail(&export->exp_obd_chain_timed,
822                       &export->exp_obd->obd_exports_timed);
823         export->exp_obd->obd_num_exports++;
824         spin_unlock(&obd->obd_dev_lock);
825
826         return export;
827 }
828 EXPORT_SYMBOL(class_new_export);
829
830 void class_unlink_export(struct obd_export *exp)
831 {
832         class_handle_unhash(&exp->exp_handle);
833
834         spin_lock(&exp->exp_obd->obd_dev_lock);
835         /* delete an uuid-export hashitem from hashtables */
836         if (!hlist_unhashed(&exp->exp_uuid_hash))
837                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
838                                 &exp->exp_client_uuid,
839                                 &exp->exp_uuid_hash);
840
841         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
842         list_del_init(&exp->exp_obd_chain_timed);
843         exp->exp_obd->obd_num_exports--;
844         spin_unlock(&exp->exp_obd->obd_dev_lock);
845
846         /* Keep these counter valid always */
847         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
848         if (exp->exp_delayed)
849                 exp->exp_obd->obd_delayed_clients--;
850         else if (exp->exp_in_recovery)
851                 exp->exp_obd->obd_recoverable_clients--;
852         else if (exp->exp_obd->obd_recovering)
853                 exp->exp_obd->obd_max_recoverable_clients--;
854         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
855         class_export_put(exp);
856 }
857 EXPORT_SYMBOL(class_unlink_export);
858
859 /* Import management functions */
860 void class_import_destroy(struct obd_import *imp)
861 {
862         ENTRY;
863
864         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
865                 imp->imp_obd->obd_name);
866
867         LASSERT(atomic_read(&imp->imp_refcount) == 0);
868
869         ptlrpc_put_connection_superhack(imp->imp_connection);
870
871         while (!list_empty(&imp->imp_conn_list)) {
872                 struct obd_import_conn *imp_conn;
873
874                 imp_conn = list_entry(imp->imp_conn_list.next,
875                                       struct obd_import_conn, oic_item);
876                 list_del_init(&imp_conn->oic_item);
877                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
878                 OBD_FREE(imp_conn, sizeof(*imp_conn));
879         }
880
881         LASSERT(imp->imp_sec == NULL);
882         class_decref(imp->imp_obd, "import", imp);
883         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
884         EXIT;
885 }
886
887 static void import_handle_addref(void *import)
888 {
889         class_import_get(import);
890 }
891
892 struct obd_import *class_import_get(struct obd_import *import)
893 {
894         LASSERT(atomic_read(&import->imp_refcount) >= 0);
895         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
896         atomic_inc(&import->imp_refcount);
897         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
898                atomic_read(&import->imp_refcount), 
899                import->imp_obd->obd_name);
900         return import;
901 }
902 EXPORT_SYMBOL(class_import_get);
903
904 void class_import_put(struct obd_import *imp)
905 {
906         ENTRY;
907
908         LASSERT(atomic_read(&imp->imp_refcount) > 0);
909         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
910         LASSERT(list_empty(&imp->imp_zombie_chain));
911
912         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
913                atomic_read(&imp->imp_refcount) - 1, 
914                imp->imp_obd->obd_name);
915
916         if (atomic_dec_and_test(&imp->imp_refcount)) {
917                 CDEBUG(D_INFO, "final put import %p\n", imp);
918                 obd_zombie_import_add(imp);
919         }
920
921         EXIT;
922 }
923 EXPORT_SYMBOL(class_import_put);
924
925 static void init_imp_at(struct imp_at *at) {
926         int i;
927         at_init(&at->iat_net_latency, 0, 0);
928         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
929                 /* max service estimates are tracked on the server side, so
930                    don't use the AT history here, just use the last reported
931                    val. (But keep hist for proc histogram, worst_ever) */
932                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
933                         AT_FLG_NOHIST);
934         }
935 }
936
937 struct obd_import *class_new_import(struct obd_device *obd)
938 {
939         struct obd_import *imp;
940
941         OBD_ALLOC(imp, sizeof(*imp));
942         if (imp == NULL)
943                 return NULL;
944
945         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
946         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
947         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
948         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
949         spin_lock_init(&imp->imp_lock);
950         imp->imp_last_success_conn = 0;
951         imp->imp_state = LUSTRE_IMP_NEW;
952         imp->imp_obd = class_incref(obd, "import", imp);
953         sema_init(&imp->imp_sec_mutex, 1);
954         cfs_waitq_init(&imp->imp_recovery_waitq);
955
956         atomic_set(&imp->imp_refcount, 2);
957         atomic_set(&imp->imp_unregistering, 0);
958         atomic_set(&imp->imp_inflight, 0);
959         atomic_set(&imp->imp_replay_inflight, 0);
960         atomic_set(&imp->imp_inval_count, 0);
961         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
962         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
963         class_handle_hash(&imp->imp_handle, import_handle_addref);
964         init_imp_at(&imp->imp_at);
965
966         /* the default magic is V2, will be used in connect RPC, and
967          * then adjusted according to the flags in request/reply. */
968         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
969
970         return imp;
971 }
972 EXPORT_SYMBOL(class_new_import);
973
974 void class_destroy_import(struct obd_import *import)
975 {
976         LASSERT(import != NULL);
977         LASSERT(import != LP_POISON);
978
979         class_handle_unhash(&import->imp_handle);
980
981         spin_lock(&import->imp_lock);
982         import->imp_generation++;
983         spin_unlock(&import->imp_lock);
984         class_import_put(import);
985 }
986 EXPORT_SYMBOL(class_destroy_import);
987
988 /* A connection defines an export context in which preallocation can
989    be managed. This releases the export pointer reference, and returns
990    the export handle, so the export refcount is 1 when this function
991    returns. */
992 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
993                   struct obd_uuid *cluuid)
994 {
995         struct obd_export *export;
996         LASSERT(conn != NULL);
997         LASSERT(obd != NULL);
998         LASSERT(cluuid != NULL);
999         ENTRY;
1000
1001         export = class_new_export(obd, cluuid);
1002         if (IS_ERR(export))
1003                 RETURN(PTR_ERR(export));
1004
1005         conn->cookie = export->exp_handle.h_cookie;
1006         class_export_put(export);
1007
1008         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1009                cluuid->uuid, conn->cookie);
1010         RETURN(0);
1011 }
1012 EXPORT_SYMBOL(class_connect);
1013
1014 /* if export is involved in recovery then clean up related things */
1015 void class_export_recovery_cleanup(struct obd_export *exp)
1016 {
1017         struct obd_device *obd = exp->exp_obd;
1018
1019         spin_lock_bh(&obd->obd_processing_task_lock);
1020         if (obd->obd_recovering && exp->exp_in_recovery) {
1021                 spin_lock(&exp->exp_lock);
1022                 exp->exp_in_recovery = 0;
1023                 spin_unlock(&exp->exp_lock);
1024                 obd->obd_connected_clients--;
1025                 /* each connected client is counted as recoverable */
1026                 obd->obd_recoverable_clients--;
1027                 if (exp->exp_req_replay_needed) {
1028                         spin_lock(&exp->exp_lock);
1029                         exp->exp_req_replay_needed = 0;
1030                         spin_unlock(&exp->exp_lock);
1031                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
1032                         atomic_dec(&obd->obd_req_replay_clients);
1033                 }
1034                 if (exp->exp_lock_replay_needed) {
1035                         spin_lock(&exp->exp_lock);
1036                         exp->exp_lock_replay_needed = 0;
1037                         spin_unlock(&exp->exp_lock);
1038                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1039                         atomic_dec(&obd->obd_lock_replay_clients);
1040                 }
1041         }
1042         spin_unlock_bh(&obd->obd_processing_task_lock);
1043 }
1044
1045 /* This function removes 1-3 references from the export:
1046  * 1 - for export pointer passed
1047  * and if disconnect really need
1048  * 2 - removing from hash
1049  * 3 - in client_unlink_export
1050  * The export pointer passed to this function can destroyed */
1051 int class_disconnect(struct obd_export *export)
1052 {
1053         int already_disconnected;
1054         ENTRY;
1055
1056         if (export == NULL) {
1057                 fixme();
1058                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1059                 RETURN(-EINVAL);
1060         }
1061
1062         spin_lock(&export->exp_lock);
1063         already_disconnected = export->exp_disconnected;
1064         export->exp_disconnected = 1;
1065         spin_unlock(&export->exp_lock);
1066
1067         /* class_cleanup(), abort_recovery(), and class_fail_export()
1068          * all end up in here, and if any of them race we shouldn't
1069          * call extra class_export_puts(). */
1070         if (already_disconnected) {
1071                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1072                 GOTO(no_disconn, already_disconnected);
1073         }
1074
1075         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1076                export->exp_handle.h_cookie);
1077
1078         if (!hlist_unhashed(&export->exp_nid_hash))
1079                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1080                                 &export->exp_connection->c_peer.nid,
1081                                 &export->exp_nid_hash);
1082
1083         class_export_recovery_cleanup(export);
1084         class_unlink_export(export);
1085 no_disconn:
1086         class_export_put(export);
1087         RETURN(0);
1088 }
1089
1090 static void class_disconnect_export_list(struct list_head *list,
1091                                          enum obd_option flags)
1092 {
1093         int rc;
1094         struct obd_export *exp;
1095         ENTRY;
1096
1097         /* It's possible that an export may disconnect itself, but
1098          * nothing else will be added to this list. */
1099         while (!list_empty(list)) {
1100                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1101                 /* need for safe call CDEBUG after obd_disconnect */
1102                 class_export_get(exp);
1103
1104                 spin_lock(&exp->exp_lock);
1105                 exp->exp_flags = flags;
1106                 spin_unlock(&exp->exp_lock);
1107
1108                 if (obd_uuid_equals(&exp->exp_client_uuid,
1109                                     &exp->exp_obd->obd_uuid)) {
1110                         CDEBUG(D_HA,
1111                                "exp %p export uuid == obd uuid, don't discon\n",
1112                                exp);
1113                         /* Need to delete this now so we don't end up pointing
1114                          * to work_list later when this export is cleaned up. */
1115                         list_del_init(&exp->exp_obd_chain);
1116                         class_export_put(exp);
1117                         continue;
1118                 }
1119
1120                 class_export_get(exp);
1121                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1122                        "last request at "CFS_TIME_T"\n",
1123                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1124                        exp, exp->exp_last_request_time);
1125                 /* release one export reference anyway */
1126                 rc = obd_disconnect(exp);
1127
1128                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1129                        obd_export_nid2str(exp), exp, rc);
1130                 class_export_put(exp);
1131         }
1132         EXIT;
1133 }
1134
1135 void class_disconnect_exports(struct obd_device *obd)
1136 {
1137         struct list_head work_list;
1138         ENTRY;
1139
1140         /* Move all of the exports from obd_exports to a work list, en masse. */
1141         CFS_INIT_LIST_HEAD(&work_list);
1142         spin_lock(&obd->obd_dev_lock);
1143         list_splice_init(&obd->obd_exports, &work_list);
1144         list_splice_init(&obd->obd_delayed_exports, &work_list);
1145         spin_unlock(&obd->obd_dev_lock);
1146
1147         if (!list_empty(&work_list)) {
1148                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1149                        "disconnecting them\n", obd->obd_minor, obd);
1150                 class_disconnect_export_list(&work_list,
1151                                              exp_flags_from_obd(obd));
1152         } else
1153                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1154                        obd->obd_minor, obd);
1155         EXIT;
1156 }
1157 EXPORT_SYMBOL(class_disconnect_exports);
1158
1159 /* Remove exports that have not completed recovery.
1160  */
1161 void class_disconnect_stale_exports(struct obd_device *obd,
1162                                     int (*test_export)(struct obd_export *),
1163                                     enum obd_option flags)
1164 {
1165         struct list_head work_list;
1166         struct list_head *pos, *n;
1167         struct obd_export *exp;
1168         ENTRY;
1169
1170         CFS_INIT_LIST_HEAD(&work_list);
1171         spin_lock(&obd->obd_dev_lock);
1172         obd->obd_stale_clients = 0;
1173         list_for_each_safe(pos, n, &obd->obd_exports) {
1174                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1175                 if (test_export(exp))
1176                         continue;
1177
1178                 list_move(&exp->exp_obd_chain, &work_list);
1179                 /* don't count self-export as client */
1180                 if (obd_uuid_equals(&exp->exp_client_uuid,
1181                                      &exp->exp_obd->obd_uuid))
1182                         continue;
1183
1184                 obd->obd_stale_clients++;
1185                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1186                        obd->obd_name, exp->exp_client_uuid.uuid,
1187                        exp->exp_connection == NULL ? "<unknown>" :
1188                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1189         }
1190         spin_unlock(&obd->obd_dev_lock);
1191
1192         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1193                obd->obd_stale_clients);
1194
1195         class_disconnect_export_list(&work_list, flags);
1196         EXIT;
1197 }
1198 EXPORT_SYMBOL(class_disconnect_stale_exports);
1199
1200 void class_fail_export(struct obd_export *exp)
1201 {
1202         int rc, already_failed;
1203
1204         spin_lock(&exp->exp_lock);
1205         already_failed = exp->exp_failed;
1206         exp->exp_failed = 1;
1207         spin_unlock(&exp->exp_lock);
1208
1209         if (already_failed) {
1210                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1211                        exp, exp->exp_client_uuid.uuid);
1212                 return;
1213         }
1214
1215         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1216                exp, exp->exp_client_uuid.uuid);
1217
1218         if (obd_dump_on_timeout)
1219                 libcfs_debug_dumplog();
1220
1221         /* Most callers into obd_disconnect are removing their own reference
1222          * (request, for example) in addition to the one from the hash table.
1223          * We don't have such a reference here, so make one. */
1224         class_export_get(exp);
1225         rc = obd_disconnect(exp);
1226         if (rc)
1227                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1228         else
1229                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1230                        exp, exp->exp_client_uuid.uuid);
1231 }
1232 EXPORT_SYMBOL(class_fail_export);
1233
1234 char *obd_export_nid2str(struct obd_export *exp)
1235 {
1236         if (exp->exp_connection != NULL)
1237                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1238
1239         return "(no nid)";
1240 }
1241 EXPORT_SYMBOL(obd_export_nid2str);
1242
1243 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1244 {
1245         struct obd_export *doomed_exp = NULL;
1246         int exports_evicted = 0;
1247
1248         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1249
1250         do {
1251                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1252                 if (doomed_exp == NULL)
1253                         break;
1254
1255                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1256                          "nid %s found, wanted nid %s, requested nid %s\n",
1257                          obd_export_nid2str(doomed_exp),
1258                          libcfs_nid2str(nid_key), nid);
1259                 LASSERTF(doomed_exp != obd->obd_self_export,
1260                          "self-export is hashed by NID?\n");
1261                 exports_evicted++;
1262                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1263                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1264                        exports_evicted);
1265                 class_fail_export(doomed_exp);
1266                 class_export_put(doomed_exp);
1267         } while (1);
1268
1269         if (!exports_evicted)
1270                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1271                        obd->obd_name, nid);
1272         return exports_evicted;
1273 }
1274 EXPORT_SYMBOL(obd_export_evict_by_nid);
1275
1276 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1277 {
1278         struct obd_export *doomed_exp = NULL;
1279         struct obd_uuid doomed_uuid;
1280         int exports_evicted = 0;
1281
1282         obd_str2uuid(&doomed_uuid, uuid);
1283         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1284                 CERROR("%s: can't evict myself\n", obd->obd_name);
1285                 return exports_evicted;
1286         }
1287
1288         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1289
1290         if (doomed_exp == NULL) {
1291                 CERROR("%s: can't disconnect %s: no exports found\n",
1292                        obd->obd_name, uuid);
1293         } else {
1294                 CWARN("%s: evicting %s at adminstrative request\n",
1295                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1296                 class_fail_export(doomed_exp);
1297                 class_export_put(doomed_exp);
1298                 exports_evicted++;
1299         }
1300
1301         return exports_evicted;
1302 }
1303 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1304
1305 static void print_export_data(struct obd_export *exp, const char *status)
1306 {
1307         struct ptlrpc_reply_state *rs;
1308         struct ptlrpc_reply_state *first_reply = NULL;
1309         int nreplies = 0;
1310
1311         spin_lock(&exp->exp_lock);
1312         list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1313                 if (nreplies == 0)
1314                         first_reply = rs;
1315                 nreplies++;
1316         }
1317         spin_unlock(&exp->exp_lock);
1318
1319         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1320                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1321                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1322                atomic_read(&exp->exp_rpc_count),
1323                atomic_read(&exp->exp_cb_count),
1324                atomic_read(&exp->exp_locks_count),
1325                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1326                nreplies, first_reply, nreplies > 3 ? "..." : "",
1327                exp->exp_last_committed);
1328 }
1329
1330 void dump_exports(struct obd_device *obd)
1331 {
1332         struct obd_export *exp;
1333
1334         spin_lock(&obd->obd_dev_lock);
1335         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1336                 print_export_data(exp, "ACTIVE");
1337         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1338                 print_export_data(exp, "UNLINKED");
1339         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1340                 print_export_data(exp, "DELAYED");
1341         spin_unlock(&obd->obd_dev_lock);
1342         spin_lock(&obd_zombie_impexp_lock);
1343         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1344                 print_export_data(exp, "ZOMBIE");
1345         spin_unlock(&obd_zombie_impexp_lock);
1346 }
1347 EXPORT_SYMBOL(dump_exports);
1348
1349 void obd_exports_barrier(struct obd_device *obd)
1350 {
1351         int waited = 2;
1352         LASSERT(list_empty(&obd->obd_exports));
1353         spin_lock(&obd->obd_dev_lock);
1354         while (!list_empty(&obd->obd_unlinked_exports)) {
1355                 spin_unlock(&obd->obd_dev_lock);
1356                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1357                 if (waited > 5 && IS_PO2(waited)) {
1358                         LCONSOLE_WARN("Waiting for obd_unlinked_exports "
1359                                       "more than %d seconds. "
1360                                       "The obd refcount = %d. Is it stuck?\n",
1361                                       waited, atomic_read(&obd->obd_refcount));
1362                         dump_exports(obd);
1363                 }
1364                 waited *= 2;
1365                 spin_lock(&obd->obd_dev_lock);
1366         }
1367         spin_unlock(&obd->obd_dev_lock);
1368 }
1369 EXPORT_SYMBOL(obd_exports_barrier);
1370
1371 /**
1372  * kill zombie imports and exports
1373  */
1374 void obd_zombie_impexp_cull(void)
1375 {
1376         struct obd_import *import;
1377         struct obd_export *export;
1378         ENTRY;
1379
1380         do {
1381                 spin_lock(&obd_zombie_impexp_lock);
1382
1383                 import = NULL;
1384                 if (!list_empty(&obd_zombie_imports)) {
1385                         import = list_entry(obd_zombie_imports.next,
1386                                             struct obd_import,
1387                                             imp_zombie_chain);
1388                         list_del_init(&import->imp_zombie_chain);
1389                 }
1390
1391                 export = NULL;
1392                 if (!list_empty(&obd_zombie_exports)) {
1393                         export = list_entry(obd_zombie_exports.next,
1394                                             struct obd_export,
1395                                             exp_obd_chain);
1396                         list_del_init(&export->exp_obd_chain);
1397                 }
1398
1399                 spin_unlock(&obd_zombie_impexp_lock);
1400
1401                 if (import != NULL)
1402                         class_import_destroy(import);
1403
1404                 if (export != NULL)
1405                         class_export_destroy(export);
1406
1407         } while (import != NULL || export != NULL);
1408         EXIT;
1409 }
1410
1411 static struct completion        obd_zombie_start;
1412 static struct completion        obd_zombie_stop;
1413 static unsigned long            obd_zombie_flags;
1414 static cfs_waitq_t              obd_zombie_waitq;
1415
1416 enum {
1417         OBD_ZOMBIE_STOP   = 1 << 1
1418 };
1419
1420 /**
1421  * check for work for kill zombie import/export thread.
1422  */
1423 static int obd_zombie_impexp_check(void *arg)
1424 {
1425         int rc;
1426
1427         spin_lock(&obd_zombie_impexp_lock);
1428         rc = list_empty(&obd_zombie_imports) &&
1429              list_empty(&obd_zombie_exports) &&
1430              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1431
1432         spin_unlock(&obd_zombie_impexp_lock);
1433
1434         RETURN(rc);
1435 }
1436
1437 /**
1438  * Add export to the obd_zombe thread and notify it.
1439  */
1440 static void obd_zombie_export_add(struct obd_export *exp) {
1441         spin_lock(&exp->exp_obd->obd_dev_lock);
1442         LASSERT(!list_empty(&exp->exp_obd_chain));
1443         list_del_init(&exp->exp_obd_chain);
1444         spin_unlock(&exp->exp_obd->obd_dev_lock);
1445         spin_lock(&obd_zombie_impexp_lock);
1446         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1447         spin_unlock(&obd_zombie_impexp_lock);
1448
1449         if (obd_zombie_impexp_notify != NULL)
1450                 obd_zombie_impexp_notify();
1451 }
1452
1453 /**
1454  * Add import to the obd_zombe thread and notify it.
1455  */
1456 static void obd_zombie_import_add(struct obd_import *imp) {
1457         LASSERT(imp->imp_sec == NULL);
1458         spin_lock(&obd_zombie_impexp_lock);
1459         LASSERT(list_empty(&imp->imp_zombie_chain));
1460         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1461         spin_unlock(&obd_zombie_impexp_lock);
1462
1463         if (obd_zombie_impexp_notify != NULL)
1464                 obd_zombie_impexp_notify();
1465 }
1466
1467 /**
1468  * notify import/export destroy thread about new zombie.
1469  */
1470 static void obd_zombie_impexp_notify(void)
1471 {
1472         cfs_waitq_signal(&obd_zombie_waitq);
1473 }
1474
1475 /**
1476  * check whether obd_zombie is idle
1477  */
1478 static int obd_zombie_is_idle(void)
1479 {
1480         int rc;
1481
1482         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1483         spin_lock(&obd_zombie_impexp_lock);
1484         rc = list_empty(&obd_zombie_imports) &&
1485              list_empty(&obd_zombie_exports);
1486         spin_unlock(&obd_zombie_impexp_lock);
1487         return rc;
1488 }
1489
1490 /**
1491  * wait when obd_zombie import/export queues become empty
1492  */
1493 void obd_zombie_barrier(void)
1494 {
1495         struct l_wait_info lwi = { 0 };
1496         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1497 }
1498 EXPORT_SYMBOL(obd_zombie_barrier);
1499
1500 #ifdef __KERNEL__
1501
1502 /**
1503  * destroy zombie export/import thread.
1504  */
1505 static int obd_zombie_impexp_thread(void *unused)
1506 {
1507         int rc;
1508
1509         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1510                 complete(&obd_zombie_start);
1511                 RETURN(rc);
1512         }
1513
1514         complete(&obd_zombie_start);
1515
1516         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1517                 struct l_wait_info lwi = { 0 };
1518
1519                 l_wait_event(obd_zombie_waitq, 
1520                              !obd_zombie_impexp_check(NULL), &lwi);
1521                 obd_zombie_impexp_cull();
1522
1523                 /* 
1524                  * Notify obd_zombie_barrier callers that queues
1525                  * may be empty.
1526                  */
1527                 cfs_waitq_signal(&obd_zombie_waitq);
1528         }
1529
1530         complete(&obd_zombie_stop);
1531
1532         RETURN(0);
1533 }
1534
1535 #else /* ! KERNEL */
1536
1537 static atomic_t zombie_recur = ATOMIC_INIT(0);
1538 static void *obd_zombie_impexp_work_cb;
1539 static void *obd_zombie_impexp_idle_cb;
1540
1541 int obd_zombie_impexp_kill(void *arg)
1542 {
1543         int rc = 0;
1544
1545         if (atomic_inc_return(&zombie_recur) == 1) {
1546                 obd_zombie_impexp_cull();
1547                 rc = 1;
1548         }
1549         atomic_dec(&zombie_recur);
1550         return rc;
1551 }
1552
1553 #endif
1554
1555 /**
1556  * start destroy zombie import/export thread
1557  */
1558 int obd_zombie_impexp_init(void)
1559 {
1560         int rc;
1561
1562         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1563         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1564         spin_lock_init(&obd_zombie_impexp_lock);
1565         init_completion(&obd_zombie_start);
1566         init_completion(&obd_zombie_stop);
1567         cfs_waitq_init(&obd_zombie_waitq);
1568
1569 #ifdef __KERNEL__
1570         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1571         if (rc < 0)
1572                 RETURN(rc);
1573
1574         wait_for_completion(&obd_zombie_start);
1575 #else
1576
1577         obd_zombie_impexp_work_cb =
1578                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1579                                                  &obd_zombie_impexp_kill, NULL);
1580
1581         obd_zombie_impexp_idle_cb =
1582                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1583                                                  &obd_zombie_impexp_check, NULL);
1584         rc = 0;
1585 #endif
1586         RETURN(rc);
1587 }
1588 /**
1589  * stop destroy zombie import/export thread
1590  */
1591 void obd_zombie_impexp_stop(void)
1592 {
1593         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594         obd_zombie_impexp_notify();
1595 #ifdef __KERNEL__
1596         wait_for_completion(&obd_zombie_stop);
1597 #else
1598         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1599         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1600 #endif
1601 }