Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510                                             struct obd_uuid *grp_uuid)
511 {
512         struct obd_device *obd;
513
514         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
515         if (!obd)
516                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
517                                             grp_uuid);
518         return obd;
519 }
520
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522    searching at *next, and if a device is found, the next index to look
523    at is saved in *next. If next is NULL, then the first matching device
524    will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
526 {
527         int i;
528
529         if (next == NULL)
530                 i = 0;
531         else if (*next >= 0 && *next < class_devno_max())
532                 i = *next;
533         else
534                 return NULL;
535
536         spin_lock(&obd_dev_lock);
537         for (; i < class_devno_max(); i++) {
538                 struct obd_device *obd = class_num2obd(i);
539                 if (obd == NULL)
540                         continue;
541                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
542                         if (next != NULL)
543                                 *next = i+1;
544                         spin_unlock(&obd_dev_lock);
545                         return obd;
546                 }
547         }
548         spin_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552
553
554 void obd_cleanup_caches(void)
555 {
556         int rc;
557
558         ENTRY;
559         if (obd_device_cachep) {
560                 rc = cfs_mem_cache_destroy(obd_device_cachep);
561                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562                 obd_device_cachep = NULL;
563         }
564         if (obdo_cachep) {
565                 rc = cfs_mem_cache_destroy(obdo_cachep);
566                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
567                 obdo_cachep = NULL;
568         }
569         if (import_cachep) {
570                 rc = cfs_mem_cache_destroy(import_cachep);
571                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572                 import_cachep = NULL;
573         }
574         if (capa_cachep) {
575                 rc = cfs_mem_cache_destroy(capa_cachep);
576                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
577                 capa_cachep = NULL;
578         }
579         EXIT;
580 }
581
582 int obd_init_caches(void)
583 {
584         ENTRY;
585
586         LASSERT(obd_device_cachep == NULL);
587         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588                                                  sizeof(struct obd_device),
589                                                  0, 0);
590         if (!obd_device_cachep)
591                 GOTO(out, -ENOMEM);
592
593         LASSERT(obdo_cachep == NULL);
594         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
595                                            0, 0);
596         if (!obdo_cachep)
597                 GOTO(out, -ENOMEM);
598
599         LASSERT(import_cachep == NULL);
600         import_cachep = cfs_mem_cache_create("ll_import_cache",
601                                              sizeof(struct obd_import),
602                                              0, 0);
603         if (!import_cachep)
604                 GOTO(out, -ENOMEM);
605
606         LASSERT(capa_cachep == NULL);
607         capa_cachep = cfs_mem_cache_create("capa_cache",
608                                            sizeof(struct obd_capa), 0, 0);
609         if (!capa_cachep)
610                 GOTO(out, -ENOMEM);
611
612         RETURN(0);
613  out:
614         obd_cleanup_caches();
615         RETURN(-ENOMEM);
616
617 }
618
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
621 {
622         struct obd_export *export;
623         ENTRY;
624
625         if (!conn) {
626                 CDEBUG(D_CACHE, "looking for null handle\n");
627                 RETURN(NULL);
628         }
629
630         if (conn->cookie == -1) {  /* this means assign a new connection */
631                 CDEBUG(D_CACHE, "want a new connection\n");
632                 RETURN(NULL);
633         }
634
635         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636         export = class_handle2object(conn->cookie);
637         RETURN(export);
638 }
639
640 struct obd_device *class_exp2obd(struct obd_export *exp)
641 {
642         if (exp)
643                 return exp->exp_obd;
644         return NULL;
645 }
646
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
648 {
649         struct obd_export *export;
650         export = class_conn2export(conn);
651         if (export) {
652                 struct obd_device *obd = export->exp_obd;
653                 class_export_put(export);
654                 return obd;
655         }
656         return NULL;
657 }
658
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
660 {
661         struct obd_device *obd = exp->exp_obd;
662         if (obd == NULL)
663                 return NULL;
664         return obd->u.cli.cl_import;
665 }
666
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
668 {
669         struct obd_device *obd = class_conn2obd(conn);
670         if (obd == NULL)
671                 return NULL;
672         return obd->u.cli.cl_import;
673 }
674
675 /* Export management functions */
676 static void export_handle_addref(void *export)
677 {
678         class_export_get(export);
679 }
680
681 struct obd_export *class_export_get(struct obd_export *exp)
682 {
683         atomic_inc(&exp->exp_refcount);
684         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685                atomic_read(&exp->exp_refcount));
686         return exp;
687 }
688 EXPORT_SYMBOL(class_export_get);
689
690 void class_export_put(struct obd_export *exp)
691 {
692         LASSERT(exp != NULL);
693         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694                atomic_read(&exp->exp_refcount) - 1);
695         LASSERT(atomic_read(&exp->exp_refcount) > 0);
696         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
697
698         if (atomic_dec_and_test(&exp->exp_refcount)) {
699                 LASSERT (list_empty(&exp->exp_obd_chain));
700
701                 CDEBUG(D_IOCTL, "final put %p/%s\n",
702                        exp, exp->exp_client_uuid.uuid);
703
704                 spin_lock(&obd_zombie_impexp_lock);
705                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706                 spin_unlock(&obd_zombie_impexp_lock);
707
708                 if (obd_zombie_impexp_notify != NULL)
709                         obd_zombie_impexp_notify();
710         }
711 }
712 EXPORT_SYMBOL(class_export_put);
713
714 static void class_export_destroy(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         ENTRY;
718
719         LASSERT (atomic_read(&exp->exp_refcount) == 0);
720
721         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722                exp->exp_client_uuid.uuid, obd->obd_name);
723
724         LASSERT(obd != NULL);
725
726         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727         if (exp->exp_connection)
728                 ptlrpc_put_connection_superhack(exp->exp_connection);
729
730         LASSERT(list_empty(&exp->exp_outstanding_replies));
731         LASSERT(list_empty(&exp->exp_req_replay_queue));
732         obd_destroy_export(exp);
733         class_decref(obd, "export", exp);
734
735         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736         EXIT;
737 }
738
739 /* Creates a new export, adds it to the hash table, and returns a
740  * pointer to it. The refcount is 2: one for the hash reference, and
741  * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743                                     struct obd_uuid *cluuid)
744 {
745         struct obd_export *export;
746         int rc = 0;
747
748         OBD_ALLOC_PTR(export);
749         if (!export)
750                 return ERR_PTR(-ENOMEM);
751
752         export->exp_conn_cnt = 0;
753         export->exp_lock_hash = NULL;
754         atomic_set(&export->exp_refcount, 2);
755         atomic_set(&export->exp_rpc_count, 0);
756         export->exp_obd = obd;
757         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760         class_handle_hash(&export->exp_handle, export_handle_addref);
761         export->exp_last_request_time = cfs_time_current_sec();
762         spin_lock_init(&export->exp_lock);
763         INIT_HLIST_NODE(&export->exp_uuid_hash);
764         INIT_HLIST_NODE(&export->exp_nid_hash);
765
766         export->exp_sp_peer = LUSTRE_SP_ANY;
767         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768         export->exp_client_uuid = *cluuid;
769         obd_init_export(export);
770
771         spin_lock(&obd->obd_dev_lock);
772         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774                                             &export->exp_uuid_hash);
775                 if (rc != 0) {
776                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777                                       obd->obd_name, cluuid->uuid, rc);
778                         spin_unlock(&obd->obd_dev_lock);
779                         class_handle_unhash(&export->exp_handle);
780                         OBD_FREE_PTR(export);
781                         return ERR_PTR(-EALREADY);
782                 }
783         }
784
785         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786         class_incref(obd, "export", export);
787         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788         list_add_tail(&export->exp_obd_chain_timed,
789                       &export->exp_obd->obd_exports_timed);
790         export->exp_obd->obd_num_exports++;
791         spin_unlock(&obd->obd_dev_lock);
792
793         return export;
794 }
795 EXPORT_SYMBOL(class_new_export);
796
797 void class_unlink_export(struct obd_export *exp)
798 {
799         class_handle_unhash(&exp->exp_handle);
800
801         spin_lock(&exp->exp_obd->obd_dev_lock);
802         /* delete an uuid-export hashitem from hashtables */
803         if (!hlist_unhashed(&exp->exp_uuid_hash))
804                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805                                 &exp->exp_client_uuid,
806                                 &exp->exp_uuid_hash);
807
808         list_del_init(&exp->exp_obd_chain);
809         list_del_init(&exp->exp_obd_chain_timed);
810         exp->exp_obd->obd_num_exports--;
811         spin_unlock(&exp->exp_obd->obd_dev_lock);
812
813         class_export_put(exp);
814 }
815 EXPORT_SYMBOL(class_unlink_export);
816
817 /* Import management functions */
818 static void import_handle_addref(void *import)
819 {
820         class_import_get(import);
821 }
822
823 struct obd_import *class_import_get(struct obd_import *import)
824 {
825         LASSERT(atomic_read(&import->imp_refcount) >= 0);
826         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827         atomic_inc(&import->imp_refcount);
828         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829                atomic_read(&import->imp_refcount));
830         return import;
831 }
832 EXPORT_SYMBOL(class_import_get);
833
834 void class_import_put(struct obd_import *import)
835 {
836         ENTRY;
837
838         LASSERT(atomic_read(&import->imp_refcount) > 0);
839         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840         LASSERT(list_empty(&import->imp_zombie_chain));
841
842         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843                atomic_read(&import->imp_refcount) - 1);
844
845         if (atomic_dec_and_test(&import->imp_refcount)) {
846
847                 CDEBUG(D_INFO, "final put import %p\n", import);
848
849                 spin_lock(&obd_zombie_impexp_lock);
850                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851                 spin_unlock(&obd_zombie_impexp_lock);
852
853                 if (obd_zombie_impexp_notify != NULL)
854                         obd_zombie_impexp_notify();
855         }
856
857         EXIT;
858 }
859 EXPORT_SYMBOL(class_import_put);
860
861 void class_import_destroy(struct obd_import *import)
862 {
863         ENTRY;
864
865         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866                 import->imp_obd->obd_name);
867
868         LASSERT(atomic_read(&import->imp_refcount) == 0);
869
870         ptlrpc_put_connection_superhack(import->imp_connection);
871
872         while (!list_empty(&import->imp_conn_list)) {
873                 struct obd_import_conn *imp_conn;
874
875                 imp_conn = list_entry(import->imp_conn_list.next,
876                                       struct obd_import_conn, oic_item);
877                 list_del(&imp_conn->oic_item);
878                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879                 OBD_FREE(imp_conn, sizeof(*imp_conn));
880         }
881
882         LASSERT(import->imp_sec == NULL);
883         class_decref(import->imp_obd, "import", import);
884         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
885         EXIT;
886 }
887
888 static void init_imp_at(struct imp_at *at) {
889         int i;
890         at_init(&at->iat_net_latency, 0, 0);
891         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892                 /* max service estimates are tracked on the server side, so
893                    don't use the AT history here, just use the last reported
894                    val. (But keep hist for proc histogram, worst_ever) */
895                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
896                         AT_FLG_NOHIST);
897         }
898 }
899
900 struct obd_import *class_new_import(struct obd_device *obd)
901 {
902         struct obd_import *imp;
903
904         OBD_ALLOC(imp, sizeof(*imp));
905         if (imp == NULL)
906                 return NULL;
907
908         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912         spin_lock_init(&imp->imp_lock);
913         imp->imp_last_success_conn = 0;
914         imp->imp_state = LUSTRE_IMP_NEW;
915         imp->imp_obd = class_incref(obd, "import", imp);
916         sema_init(&imp->imp_sec_mutex, 1);
917         cfs_waitq_init(&imp->imp_recovery_waitq);
918
919         atomic_set(&imp->imp_refcount, 2);
920         atomic_set(&imp->imp_unregistering, 0);
921         atomic_set(&imp->imp_inflight, 0);
922         atomic_set(&imp->imp_replay_inflight, 0);
923         atomic_set(&imp->imp_inval_count, 0);
924         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
925         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
926         class_handle_hash(&imp->imp_handle, import_handle_addref);
927         init_imp_at(&imp->imp_at);
928
929         /* the default magic is V2, will be used in connect RPC, and
930          * then adjusted according to the flags in request/reply. */
931         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
932
933         return imp;
934 }
935 EXPORT_SYMBOL(class_new_import);
936
937 void class_destroy_import(struct obd_import *import)
938 {
939         LASSERT(import != NULL);
940         LASSERT(import != LP_POISON);
941
942         class_handle_unhash(&import->imp_handle);
943
944         spin_lock(&import->imp_lock);
945         import->imp_generation++;
946         spin_unlock(&import->imp_lock);
947         class_import_put(import);
948 }
949 EXPORT_SYMBOL(class_destroy_import);
950
951 /* A connection defines an export context in which preallocation can
952    be managed. This releases the export pointer reference, and returns
953    the export handle, so the export refcount is 1 when this function
954    returns. */
955 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
956                   struct obd_uuid *cluuid)
957 {
958         struct obd_export *export;
959         LASSERT(conn != NULL);
960         LASSERT(obd != NULL);
961         LASSERT(cluuid != NULL);
962         ENTRY;
963
964         export = class_new_export(obd, cluuid);
965         if (IS_ERR(export))
966                 RETURN(PTR_ERR(export));
967
968         conn->cookie = export->exp_handle.h_cookie;
969         class_export_put(export);
970
971         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
972                cluuid->uuid, conn->cookie);
973         RETURN(0);
974 }
975 EXPORT_SYMBOL(class_connect);
976
977 /* if export is involved in recovery then clean up related things */
978 void class_export_recovery_cleanup(struct obd_export *exp)
979 {
980         struct obd_device *obd = exp->exp_obd;
981
982         spin_lock_bh(&obd->obd_processing_task_lock);
983         if (obd->obd_recovering && exp->exp_in_recovery) {
984                 spin_lock(&exp->exp_lock);
985                 exp->exp_in_recovery = 0;
986                 spin_unlock(&exp->exp_lock);
987                 obd->obd_connected_clients--;
988                 /* each connected client is counted as recoverable */
989                 obd->obd_recoverable_clients--;
990                 if (exp->exp_req_replay_needed) {
991                         spin_lock(&exp->exp_lock);
992                         exp->exp_req_replay_needed = 0;
993                         spin_unlock(&exp->exp_lock);
994                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
995                         atomic_dec(&obd->obd_req_replay_clients);
996                 }
997                 if (exp->exp_lock_replay_needed) {
998                         spin_lock(&exp->exp_lock);
999                         exp->exp_lock_replay_needed = 0;
1000                         spin_unlock(&exp->exp_lock);
1001                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1002                         atomic_dec(&obd->obd_lock_replay_clients);
1003                 }
1004         }
1005         spin_unlock_bh(&obd->obd_processing_task_lock);
1006 }
1007
1008 /* This function removes two references from the export: one for the
1009  * hash entry and one for the export pointer passed in.  The export
1010  * pointer passed to this function is destroyed should not be used
1011  * again. */
1012 int class_disconnect(struct obd_export *export)
1013 {
1014         int already_disconnected;
1015         ENTRY;
1016
1017         if (export == NULL) {
1018                 fixme();
1019                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1020                 RETURN(-EINVAL);
1021         }
1022
1023         spin_lock(&export->exp_lock);
1024         already_disconnected = export->exp_disconnected;
1025         export->exp_disconnected = 1;
1026
1027         if (!hlist_unhashed(&export->exp_nid_hash))
1028                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1029                                 &export->exp_connection->c_peer.nid,
1030                                 &export->exp_nid_hash);
1031
1032         spin_unlock(&export->exp_lock);
1033
1034         /* class_cleanup(), abort_recovery(), and class_fail_export()
1035          * all end up in here, and if any of them race we shouldn't
1036          * call extra class_export_puts(). */
1037         if (already_disconnected)
1038                 RETURN(0);
1039
1040         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1041                export->exp_handle.h_cookie);
1042
1043         class_export_recovery_cleanup(export);
1044         class_unlink_export(export);
1045         class_export_put(export);
1046         RETURN(0);
1047 }
1048
1049 static void class_disconnect_export_list(struct list_head *list, int flags)
1050 {
1051         int rc;
1052         struct lustre_handle fake_conn;
1053         struct obd_export *fake_exp, *exp;
1054         ENTRY;
1055
1056         /* It's possible that an export may disconnect itself, but
1057          * nothing else will be added to this list. */
1058         while (!list_empty(list)) {
1059                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1060                 class_export_get(exp);
1061
1062                 spin_lock(&exp->exp_lock);
1063                 exp->exp_flags = flags;
1064                 spin_unlock(&exp->exp_lock);
1065
1066                 if (obd_uuid_equals(&exp->exp_client_uuid,
1067                                     &exp->exp_obd->obd_uuid)) {
1068                         CDEBUG(D_HA,
1069                                "exp %p export uuid == obd uuid, don't discon\n",
1070                                exp);
1071                         /* Need to delete this now so we don't end up pointing
1072                          * to work_list later when this export is cleaned up. */
1073                         list_del_init(&exp->exp_obd_chain);
1074                         class_export_put(exp);
1075                         continue;
1076                 }
1077
1078                 fake_conn.cookie = exp->exp_handle.h_cookie;
1079                 fake_exp = class_conn2export(&fake_conn);
1080                 if (!fake_exp) {
1081                         class_export_put(exp);
1082                         continue;
1083                 }
1084
1085                 spin_lock(&fake_exp->exp_lock);
1086                 fake_exp->exp_flags = flags;
1087                 spin_unlock(&fake_exp->exp_lock);
1088
1089                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1090                        "last request at "CFS_TIME_T"\n",
1091                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1092                        exp, exp->exp_last_request_time);
1093                 rc = obd_disconnect(fake_exp);
1094                 class_export_put(exp);
1095         }
1096         EXIT;
1097 }
1098
1099 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1100 {
1101         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1102                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1103 }
1104
1105 void class_disconnect_exports(struct obd_device *obd)
1106 {
1107         struct list_head work_list;
1108         ENTRY;
1109
1110         /* Move all of the exports from obd_exports to a work list, en masse. */
1111         spin_lock(&obd->obd_dev_lock);
1112         list_add(&work_list, &obd->obd_exports);
1113         list_del_init(&obd->obd_exports);
1114         spin_unlock(&obd->obd_dev_lock);
1115
1116         if (!list_empty(&work_list)) {
1117                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1118                        "disconnecting them\n", obd->obd_minor, obd);
1119                 class_disconnect_export_list(&work_list,
1120                                              get_exp_flags_from_obd(obd));
1121         } else
1122                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1123                        obd->obd_minor, obd);
1124         EXIT;
1125 }
1126 EXPORT_SYMBOL(class_disconnect_exports);
1127
1128 /* Remove exports that have not completed recovery.
1129  */
1130 int class_disconnect_stale_exports(struct obd_device *obd,
1131                                    int (*test_export)(struct obd_export *))
1132 {
1133         struct list_head work_list;
1134         struct list_head *pos, *n;
1135         struct obd_export *exp;
1136         int cnt = 0;
1137         ENTRY;
1138
1139         CFS_INIT_LIST_HEAD(&work_list);
1140         spin_lock(&obd->obd_dev_lock);
1141         list_for_each_safe(pos, n, &obd->obd_exports) {
1142                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1143                 if (test_export(exp))
1144                         continue;
1145
1146                 list_del(&exp->exp_obd_chain);
1147                 list_add(&exp->exp_obd_chain, &work_list);
1148                 /* don't count self-export as client */
1149                 if (obd_uuid_equals(&exp->exp_client_uuid,
1150                                      &exp->exp_obd->obd_uuid))
1151                         continue;
1152
1153                 cnt++;
1154                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1155                        obd->obd_name, exp->exp_client_uuid.uuid,
1156                        exp->exp_connection == NULL ? "<unknown>" :
1157                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1158         }
1159         spin_unlock(&obd->obd_dev_lock);
1160
1161         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1162                obd->obd_name, cnt);
1163         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1164         RETURN(cnt);
1165 }
1166 EXPORT_SYMBOL(class_disconnect_stale_exports);
1167
1168 void class_fail_export(struct obd_export *exp)
1169 {
1170         int rc, already_failed;
1171
1172         spin_lock(&exp->exp_lock);
1173         already_failed = exp->exp_failed;
1174         exp->exp_failed = 1;
1175         spin_unlock(&exp->exp_lock);
1176
1177         if (already_failed) {
1178                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1179                        exp, exp->exp_client_uuid.uuid);
1180                 return;
1181         }
1182
1183         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1184                exp, exp->exp_client_uuid.uuid);
1185
1186         if (obd_dump_on_timeout)
1187                 libcfs_debug_dumplog();
1188
1189         /* Most callers into obd_disconnect are removing their own reference
1190          * (request, for example) in addition to the one from the hash table.
1191          * We don't have such a reference here, so make one. */
1192         class_export_get(exp);
1193         rc = obd_disconnect(exp);
1194         if (rc)
1195                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1196         else
1197                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1198                        exp, exp->exp_client_uuid.uuid);
1199 }
1200 EXPORT_SYMBOL(class_fail_export);
1201
1202 char *obd_export_nid2str(struct obd_export *exp)
1203 {
1204         if (exp->exp_connection != NULL)
1205                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1206
1207         return "(no nid)";
1208 }
1209 EXPORT_SYMBOL(obd_export_nid2str);
1210
1211 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1212 {
1213         struct obd_export *doomed_exp = NULL;
1214         int exports_evicted = 0;
1215
1216         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1217
1218         do {
1219                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1220                 if (doomed_exp == NULL)
1221                         break;
1222
1223                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1224                          "nid %s found, wanted nid %s, requested nid %s\n",
1225                          obd_export_nid2str(doomed_exp),
1226                          libcfs_nid2str(nid_key), nid);
1227                 LASSERTF(doomed_exp != obd->obd_self_export,
1228                          "self-export is hashed by NID?\n");
1229                 exports_evicted++;
1230                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1231                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1232                        exports_evicted);
1233                 class_fail_export(doomed_exp);
1234                 class_export_put(doomed_exp);
1235         } while (1);
1236
1237         if (!exports_evicted)
1238                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1239                        obd->obd_name, nid);
1240         return exports_evicted;
1241 }
1242 EXPORT_SYMBOL(obd_export_evict_by_nid);
1243
1244 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1245 {
1246         struct obd_export *doomed_exp = NULL;
1247         struct obd_uuid doomed_uuid;
1248         int exports_evicted = 0;
1249
1250         obd_str2uuid(&doomed_uuid, uuid);
1251         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1252                 CERROR("%s: can't evict myself\n", obd->obd_name);
1253                 return exports_evicted;
1254         }
1255
1256         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1257
1258         if (doomed_exp == NULL) {
1259                 CERROR("%s: can't disconnect %s: no exports found\n",
1260                        obd->obd_name, uuid);
1261         } else {
1262                 CWARN("%s: evicting %s at adminstrative request\n",
1263                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1264                 class_fail_export(doomed_exp);
1265                 class_export_put(doomed_exp);
1266                 exports_evicted++;
1267         }
1268
1269         return exports_evicted;
1270 }
1271 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1272
1273 /**
1274  * kill zombie imports and exports
1275  */
1276 void obd_zombie_impexp_cull(void)
1277 {
1278         struct obd_import *import;
1279         struct obd_export *export;
1280         ENTRY;
1281
1282         do {
1283                 spin_lock (&obd_zombie_impexp_lock);
1284
1285                 import = NULL;
1286                 if (!list_empty(&obd_zombie_imports)) {
1287                         import = list_entry(obd_zombie_imports.next,
1288                                             struct obd_import,
1289                                             imp_zombie_chain);
1290                         list_del(&import->imp_zombie_chain);
1291                 }
1292
1293                 export = NULL;
1294                 if (!list_empty(&obd_zombie_exports)) {
1295                         export = list_entry(obd_zombie_exports.next,
1296                                             struct obd_export,
1297                                             exp_obd_chain);
1298                         list_del_init(&export->exp_obd_chain);
1299                 }
1300
1301                 spin_unlock(&obd_zombie_impexp_lock);
1302
1303                 if (import != NULL)
1304                         class_import_destroy(import);
1305
1306                 if (export != NULL)
1307                         class_export_destroy(export);
1308
1309         } while (import != NULL || export != NULL);
1310         EXIT;
1311 }
1312
1313 static struct completion        obd_zombie_start;
1314 static struct completion        obd_zombie_stop;
1315 static unsigned long            obd_zombie_flags;
1316 static cfs_waitq_t              obd_zombie_waitq;
1317
1318 enum {
1319         OBD_ZOMBIE_STOP = 1
1320 };
1321
1322 /**
1323  * check for work for kill zombie import/export thread.
1324  */
1325 static int obd_zombie_impexp_check(void *arg)
1326 {
1327         int rc;
1328
1329         spin_lock(&obd_zombie_impexp_lock);
1330         rc = list_empty(&obd_zombie_imports) &&
1331              list_empty(&obd_zombie_exports) &&
1332              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1333
1334         spin_unlock(&obd_zombie_impexp_lock);
1335
1336         RETURN(rc);
1337 }
1338
1339 /**
1340  * notify import/export destroy thread about new zombie.
1341  */
1342 static void obd_zombie_impexp_notify(void)
1343 {
1344         cfs_waitq_signal(&obd_zombie_waitq);
1345 }
1346
1347 /**
1348  * check whether obd_zombie is idle
1349  */
1350 static int obd_zombie_is_idle(void)
1351 {
1352         int rc;
1353
1354         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1355         spin_lock(&obd_zombie_impexp_lock);
1356         rc = list_empty(&obd_zombie_imports) &&
1357              list_empty(&obd_zombie_exports);
1358         spin_unlock(&obd_zombie_impexp_lock);
1359         return rc;
1360 }
1361
1362 /**
1363  * wait when obd_zombie import/export queues become empty
1364  */
1365 void obd_zombie_barrier(void)
1366 {
1367         struct l_wait_info lwi = { 0 };
1368
1369         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1370 }
1371 EXPORT_SYMBOL(obd_zombie_barrier);
1372
1373 #ifdef __KERNEL__
1374
1375 /**
1376  * destroy zombie export/import thread.
1377  */
1378 static int obd_zombie_impexp_thread(void *unused)
1379 {
1380         int rc;
1381
1382         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1383                 complete(&obd_zombie_start);
1384                 RETURN(rc);
1385         }
1386
1387         complete(&obd_zombie_start);
1388
1389         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1390                 struct l_wait_info lwi = { 0 };
1391
1392                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1393
1394                 obd_zombie_impexp_cull();
1395                 /* Notify obd_zombie_barrier callers that queues may be empty */
1396                 cfs_waitq_signal(&obd_zombie_waitq);
1397         }
1398
1399         complete(&obd_zombie_stop);
1400
1401         RETURN(0);
1402 }
1403
1404 #else /* ! KERNEL */
1405
1406 static atomic_t zombie_recur = ATOMIC_INIT(0);
1407 static void *obd_zombie_impexp_work_cb;
1408 static void *obd_zombie_impexp_idle_cb;
1409
1410 int obd_zombie_impexp_kill(void *arg)
1411 {
1412         int rc = 0;
1413
1414         if (atomic_inc_return(&zombie_recur) == 1) {
1415                 obd_zombie_impexp_cull();
1416                 rc = 1;
1417         }
1418         atomic_dec(&zombie_recur);
1419         return rc;
1420 }
1421
1422 #endif
1423
1424 /**
1425  * start destroy zombie import/export thread
1426  */
1427 int obd_zombie_impexp_init(void)
1428 {
1429         int rc;
1430
1431         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1432         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1433         spin_lock_init(&obd_zombie_impexp_lock);
1434         init_completion(&obd_zombie_start);
1435         init_completion(&obd_zombie_stop);
1436         cfs_waitq_init(&obd_zombie_waitq);
1437
1438 #ifdef __KERNEL__
1439         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1440         if (rc < 0)
1441                 RETURN(rc);
1442
1443         wait_for_completion(&obd_zombie_start);
1444 #else
1445
1446         obd_zombie_impexp_work_cb =
1447                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1448                                                  &obd_zombie_impexp_kill, NULL);
1449
1450         obd_zombie_impexp_idle_cb =
1451                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1452                                                  &obd_zombie_impexp_check, NULL);
1453         rc = 0;
1454
1455 #endif
1456         RETURN(rc);
1457 }
1458 /**
1459  * stop destroy zombie import/export thread
1460  */
1461 void obd_zombie_impexp_stop(void)
1462 {
1463         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1464         obd_zombie_impexp_notify();
1465 #ifdef __KERNEL__
1466         wait_for_completion(&obd_zombie_stop);
1467 #else
1468         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1469         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1470 #endif
1471 }