Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp, const char *status);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83 EXPORT_SYMBOL(obd_device_alloc);
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef CONFIG_KMOD
121         if (!type) {
122                 const char *modname = name;
123                 if (!request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 try_module_get(type->typ_dt_ops->o_owner);
136                 spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         module_put(type->typ_dt_ops->o_owner);
147         spin_unlock(&type->obd_type_lock);
148 }
149
150 #define CLASS_MAX_NAME 1024
151
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153                         struct lprocfs_vars *vars, const char *name,
154                         struct lu_device_type *ldt)
155 {
156         struct obd_type *type;
157         int rc = 0;
158         ENTRY;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 RETURN(-EEXIST);
166         }
167
168         rc = -ENOMEM;
169         OBD_ALLOC(type, sizeof(*type));
170         if (type == NULL)
171                 RETURN(rc);
172
173         OBD_ALLOC_PTR(type->typ_dt_ops);
174         OBD_ALLOC_PTR(type->typ_md_ops);
175         OBD_ALLOC(type->typ_name, strlen(name) + 1);
176
177         if (type->typ_dt_ops == NULL ||
178             type->typ_md_ops == NULL ||
179             type->typ_name == NULL)
180                 GOTO (failed, rc);
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         spin_lock_init(&type->obd_type_lock);
188
189 #ifdef LPROCFS
190         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191                                               vars, type);
192         if (IS_ERR(type->typ_procroot)) {
193                 rc = PTR_ERR(type->typ_procroot);
194                 type->typ_procroot = NULL;
195                 GOTO (failed, rc);
196         }
197 #endif
198         if (ldt != NULL) {
199                 type->typ_lu = ldt;
200                 rc = lu_device_type_init(ldt);
201                 if (rc != 0)
202                         GOTO (failed, rc);
203         }
204
205         spin_lock(&obd_types_lock);
206         list_add(&type->typ_chain, &obd_types);
207         spin_unlock(&obd_types_lock);
208
209         RETURN (0);
210
211  failed:
212         if (type->typ_name != NULL)
213                 OBD_FREE(type->typ_name, strlen(name) + 1);
214         if (type->typ_md_ops != NULL)
215                 OBD_FREE_PTR(type->typ_md_ops);
216         if (type->typ_dt_ops != NULL)
217                 OBD_FREE_PTR(type->typ_dt_ops);
218         OBD_FREE(type, sizeof(*type));
219         RETURN(rc);
220 }
221
222 int class_unregister_type(const char *name)
223 {
224         struct obd_type *type = class_search_type(name);
225         ENTRY;
226
227         if (!type) {
228                 CERROR("unknown obd type\n");
229                 RETURN(-EINVAL);
230         }
231
232         if (type->typ_refcnt) {
233                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234                 /* This is a bad situation, let's make the best of it */
235                 /* Remove ops, but leave the name for debugging */
236                 OBD_FREE_PTR(type->typ_dt_ops);
237                 OBD_FREE_PTR(type->typ_md_ops);
238                 RETURN(-EBUSY);
239         }
240
241         if (type->typ_procroot) {
242                 lprocfs_remove(&type->typ_procroot);
243         }
244
245         if (type->typ_lu)
246                 lu_device_type_fini(type->typ_lu);
247
248         spin_lock(&obd_types_lock);
249         list_del(&type->typ_chain);
250         spin_unlock(&obd_types_lock);
251         OBD_FREE(type->typ_name, strlen(name) + 1);
252         if (type->typ_dt_ops != NULL)
253                 OBD_FREE_PTR(type->typ_dt_ops);
254         if (type->typ_md_ops != NULL)
255                 OBD_FREE_PTR(type->typ_md_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(0);
258 } /* class_unregister_type */
259
260 /**
261  * Create a new obd device.
262  *
263  * Find an empty slot in ::obd_devs[], create a new obd device in it.
264  *
265  * \param[in] type_name obd device type string.
266  * \param[in] name      obd device name.
267  *
268  * \retval NULL if create fails, otherwise return the obd device
269  *         pointer created.
270  */
271 struct obd_device *class_newdev(const char *type_name, const char *name)
272 {
273         struct obd_device *result = NULL;
274         struct obd_device *newdev;
275         struct obd_type *type = NULL;
276         int i;
277         int new_obd_minor = 0;
278
279         if (strlen(name) >= MAX_OBD_NAME) {
280                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281                 RETURN(ERR_PTR(-EINVAL));
282         }
283
284         type = class_get_type(type_name);
285         if (type == NULL){
286                 CERROR("OBD: unknown type: %s\n", type_name);
287                 RETURN(ERR_PTR(-ENODEV));
288         }
289
290         newdev = obd_device_alloc();
291         if (newdev == NULL) {
292                 class_put_type(type);
293                 RETURN(ERR_PTR(-ENOMEM));
294         }
295         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296
297         spin_lock(&obd_dev_lock);
298         for (i = 0; i < class_devno_max(); i++) {
299                 struct obd_device *obd = class_num2obd(i);
300                 if (obd && obd->obd_name &&
301                     (strcmp(name, obd->obd_name) == 0)) {
302                         CERROR("Device %s already exists, won't add\n", name);
303                         if (result) {
304                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305                                          "%p obd_magic %08x != %08x\n", result,
306                                          result->obd_magic, OBD_DEVICE_MAGIC);
307                                 LASSERTF(result->obd_minor == new_obd_minor,
308                                          "%p obd_minor %d != %d\n", result,
309                                          result->obd_minor, new_obd_minor);
310
311                                 obd_devs[result->obd_minor] = NULL;
312                                 result->obd_name[0]='\0';
313                          }
314                         result = ERR_PTR(-EEXIST);
315                         break;
316                 }
317                 if (!result && !obd) {
318                         result = newdev;
319                         result->obd_minor = i;
320                         new_obd_minor = i;
321                         result->obd_type = type;
322                         strncpy(result->obd_name, name,
323                                 sizeof(result->obd_name) - 1);
324                         obd_devs[i] = result;
325                 }
326         }
327         spin_unlock(&obd_dev_lock);
328
329         if (result == NULL && i >= class_devno_max()) {
330                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331                        class_devno_max());
332                 result = ERR_PTR(-EOVERFLOW);
333         }
334
335         if (IS_ERR(result)) {
336                 obd_device_free(newdev);
337                 class_put_type(type);
338         } else {
339                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340                        result->obd_name, result);
341         }
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type != NULL);
354
355         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356                obd->obd_name,obd->obd_type->typ_name);
357
358         spin_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         spin_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         spin_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377                         /* Make sure we finished attaching before we give
378                            out any references */
379                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380                         if (obd->obd_attached) {
381                                 spin_unlock(&obd_dev_lock);
382                                 return i;
383                         }
384                         break;
385                 }
386         }
387         spin_unlock(&obd_dev_lock);
388
389         return -1;
390 }
391
392 struct obd_device *class_name2obd(const char *name)
393 {
394         int dev = class_name2dev(name);
395
396         if (dev < 0 || dev > class_devno_max())
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 int class_uuid2dev(struct obd_uuid *uuid)
402 {
403         int i;
404
405         spin_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         spin_unlock(&obd_dev_lock);
411                         return i;
412                 }
413         }
414         spin_unlock(&obd_dev_lock);
415
416         return -1;
417 }
418
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 {
421         int dev = class_uuid2dev(uuid);
422         if (dev < 0)
423                 return NULL;
424         return class_num2obd(dev);
425 }
426
427 /**
428  * Get obd device from ::obd_devs[]
429  *
430  * \param num [in] array index
431  *
432  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433  *         otherwise return the obd device there.
434  */
435 struct obd_device *class_num2obd(int num)
436 {
437         struct obd_device *obd = NULL;
438
439         if (num < class_devno_max()) {
440                 obd = obd_devs[num];
441                 if (obd == NULL)
442                         return NULL;
443
444                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445                          "%p obd_magic %08x != %08x\n",
446                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447                 LASSERTF(obd->obd_minor == num,
448                          "%p obd_minor %0d != %0d\n",
449                          obd, obd->obd_minor, num);
450         }
451
452         return obd;
453 }
454
455 void class_obd_list(void)
456 {
457         char *status;
458         int i;
459
460         spin_lock(&obd_dev_lock);
461         for (i = 0; i < class_devno_max(); i++) {
462                 struct obd_device *obd = class_num2obd(i);
463                 if (obd == NULL)
464                         continue;
465                 if (obd->obd_stopping)
466                         status = "ST";
467                 else if (obd->obd_set_up)
468                         status = "UP";
469                 else if (obd->obd_attached)
470                         status = "AT";
471                 else
472                         status = "--";
473                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474                          i, status, obd->obd_type->typ_name,
475                          obd->obd_name, obd->obd_uuid.uuid,
476                          atomic_read(&obd->obd_refcount));
477         }
478         spin_unlock(&obd_dev_lock);
479         return;
480 }
481
482 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
483    specified, then only the client with that uuid is returned,
484    otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486                                           const char * typ_name,
487                                           struct obd_uuid *grp_uuid)
488 {
489         int i;
490
491         spin_lock(&obd_dev_lock);
492         for (i = 0; i < class_devno_max(); i++) {
493                 struct obd_device *obd = class_num2obd(i);
494                 if (obd == NULL)
495                         continue;
496                 if ((strncmp(obd->obd_type->typ_name, typ_name,
497                              strlen(typ_name)) == 0)) {
498                         if (obd_uuid_equals(tgt_uuid,
499                                             &obd->u.cli.cl_target_uuid) &&
500                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
501                                                          &obd->obd_uuid) : 1)) {
502                                 spin_unlock(&obd_dev_lock);
503                                 return obd;
504                         }
505                 }
506         }
507         spin_unlock(&obd_dev_lock);
508
509         return NULL;
510 }
511
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513    searching at *next, and if a device is found, the next index to look
514    at is saved in *next. If next is NULL, then the first matching device
515    will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
517 {
518         int i;
519
520         if (next == NULL)
521                 i = 0;
522         else if (*next >= 0 && *next < class_devno_max())
523                 i = *next;
524         else
525                 return NULL;
526
527         spin_lock(&obd_dev_lock);
528         for (; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530                 if (obd == NULL)
531                         continue;
532                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
533                         if (next != NULL)
534                                 *next = i+1;
535                         spin_unlock(&obd_dev_lock);
536                         return obd;
537                 }
538         }
539         spin_unlock(&obd_dev_lock);
540
541         return NULL;
542 }
543
544 /**
545  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546  * adjust sptlrpc settings accordingly.
547  */
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 {
550         struct obd_device  *obd;
551         const char         *type;
552         int                 i, rc = 0, rc2;
553
554         LASSERT(namelen > 0);
555
556         spin_lock(&obd_dev_lock);
557         for (i = 0; i < class_devno_max(); i++) {
558                 obd = class_num2obd(i);
559
560                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
561                         continue;
562
563                 /* only notify mdc, osc, mdt, ost */
564                 type = obd->obd_type->typ_name;
565                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OST_NAME) != 0)
569                         continue;
570
571                 if (strncmp(obd->obd_name, fsname, namelen))
572                         continue;
573
574                 class_incref(obd, __FUNCTION__, obd);
575                 spin_unlock(&obd_dev_lock);
576                 rc2 = obd_set_info_async(obd->obd_self_export,
577                                          sizeof(KEY_SPTLRPC_CONF),
578                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
579                 rc = rc ? rc : rc2;
580                 class_decref(obd, __FUNCTION__, obd);
581                 spin_lock(&obd_dev_lock);
582         }
583         spin_unlock(&obd_dev_lock);
584         return rc;
585 }
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587
588 void obd_cleanup_caches(void)
589 {
590         int rc;
591
592         ENTRY;
593         if (obd_device_cachep) {
594                 rc = cfs_mem_cache_destroy(obd_device_cachep);
595                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596                 obd_device_cachep = NULL;
597         }
598         if (obdo_cachep) {
599                 rc = cfs_mem_cache_destroy(obdo_cachep);
600                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601                 obdo_cachep = NULL;
602         }
603         if (import_cachep) {
604                 rc = cfs_mem_cache_destroy(import_cachep);
605                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606                 import_cachep = NULL;
607         }
608         if (capa_cachep) {
609                 rc = cfs_mem_cache_destroy(capa_cachep);
610                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
611                 capa_cachep = NULL;
612         }
613         EXIT;
614 }
615
616 int obd_init_caches(void)
617 {
618         ENTRY;
619
620         LASSERT(obd_device_cachep == NULL);
621         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622                                                  sizeof(struct obd_device),
623                                                  0, 0);
624         if (!obd_device_cachep)
625                 GOTO(out, -ENOMEM);
626
627         LASSERT(obdo_cachep == NULL);
628         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
629                                            0, 0);
630         if (!obdo_cachep)
631                 GOTO(out, -ENOMEM);
632
633         LASSERT(import_cachep == NULL);
634         import_cachep = cfs_mem_cache_create("ll_import_cache",
635                                              sizeof(struct obd_import),
636                                              0, 0);
637         if (!import_cachep)
638                 GOTO(out, -ENOMEM);
639
640         LASSERT(capa_cachep == NULL);
641         capa_cachep = cfs_mem_cache_create("capa_cache",
642                                            sizeof(struct obd_capa), 0, 0);
643         if (!capa_cachep)
644                 GOTO(out, -ENOMEM);
645
646         RETURN(0);
647  out:
648         obd_cleanup_caches();
649         RETURN(-ENOMEM);
650
651 }
652
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 {
656         struct obd_export *export;
657         ENTRY;
658
659         if (!conn) {
660                 CDEBUG(D_CACHE, "looking for null handle\n");
661                 RETURN(NULL);
662         }
663
664         if (conn->cookie == -1) {  /* this means assign a new connection */
665                 CDEBUG(D_CACHE, "want a new connection\n");
666                 RETURN(NULL);
667         }
668
669         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670         export = class_handle2object(conn->cookie);
671         RETURN(export);
672 }
673
674 struct obd_device *class_exp2obd(struct obd_export *exp)
675 {
676         if (exp)
677                 return exp->exp_obd;
678         return NULL;
679 }
680
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 {
683         struct obd_export *export;
684         export = class_conn2export(conn);
685         if (export) {
686                 struct obd_device *obd = export->exp_obd;
687                 class_export_put(export);
688                 return obd;
689         }
690         return NULL;
691 }
692
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 {
695         struct obd_device *obd = exp->exp_obd;
696         if (obd == NULL)
697                 return NULL;
698         return obd->u.cli.cl_import;
699 }
700
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 {
703         struct obd_device *obd = class_conn2obd(conn);
704         if (obd == NULL)
705                 return NULL;
706         return obd->u.cli.cl_import;
707 }
708
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
711 {
712         struct obd_device *obd = exp->exp_obd;
713         ENTRY;
714
715         LASSERT (atomic_read(&exp->exp_refcount) == 0);
716
717         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718                exp->exp_client_uuid.uuid, obd->obd_name);
719
720         LASSERT(obd != NULL);
721
722         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723         if (exp->exp_connection)
724                 ptlrpc_put_connection_superhack(exp->exp_connection);
725
726         LASSERT(list_empty(&exp->exp_outstanding_replies));
727         LASSERT(list_empty(&exp->exp_uncommitted_replies));
728         LASSERT(list_empty(&exp->exp_req_replay_queue));
729         LASSERT(list_empty(&exp->exp_queued_rpc));
730         obd_destroy_export(exp);
731         class_decref(obd, "export", exp);
732
733         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
734         EXIT;
735 }
736
737 static void export_handle_addref(void *export)
738 {
739         class_export_get(export);
740 }
741
742 struct obd_export *class_export_get(struct obd_export *exp)
743 {
744         atomic_inc(&exp->exp_refcount);
745         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746                atomic_read(&exp->exp_refcount));
747         return exp;
748 }
749 EXPORT_SYMBOL(class_export_get);
750
751 void class_export_put(struct obd_export *exp)
752 {
753         LASSERT(exp != NULL);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                atomic_read(&exp->exp_refcount) - 1);
756         LASSERT(atomic_read(&exp->exp_refcount) > 0);
757         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758
759         if (atomic_dec_and_test(&exp->exp_refcount)) {
760                 LASSERT(!list_empty(&exp->exp_obd_chain));
761                 CDEBUG(D_IOCTL, "final put %p/%s\n",
762                        exp, exp->exp_client_uuid.uuid);
763                 obd_zombie_export_add(exp);
764         }
765 }
766 EXPORT_SYMBOL(class_export_put);
767
768 /* Creates a new export, adds it to the hash table, and returns a
769  * pointer to it. The refcount is 2: one for the hash reference, and
770  * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772                                     struct obd_uuid *cluuid)
773 {
774         struct obd_export *export;
775         int rc = 0;
776         ENTRY;
777
778         OBD_ALLOC_PTR(export);
779         if (!export)
780                 return ERR_PTR(-ENOMEM);
781
782         export->exp_conn_cnt = 0;
783         export->exp_lock_hash = NULL;
784         atomic_set(&export->exp_refcount, 2);
785         atomic_set(&export->exp_rpc_count, 0);
786         atomic_set(&export->exp_cb_count, 0);
787         atomic_set(&export->exp_locks_count, 0);
788         atomic_set(&export->exp_replay_count, 0);
789         export->exp_obd = obd;
790         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
791         spin_lock_init(&export->exp_uncommitted_replies_lock);
792         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
793         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
794         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
795         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
796         class_handle_hash(&export->exp_handle, export_handle_addref);
797         export->exp_last_request_time = cfs_time_current_sec();
798         spin_lock_init(&export->exp_lock);
799         INIT_HLIST_NODE(&export->exp_uuid_hash);
800         INIT_HLIST_NODE(&export->exp_nid_hash);
801
802         export->exp_sp_peer = LUSTRE_SP_ANY;
803         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
804         export->exp_client_uuid = *cluuid;
805         obd_init_export(export);
806
807         spin_lock(&obd->obd_dev_lock);
808          /* shouldn't happen, but might race */
809         if (obd->obd_stopping)
810                 GOTO(exit_err, rc = -ENODEV);
811
812         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
813                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
814                                             &export->exp_uuid_hash);
815                 if (rc != 0) {
816                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
817                                       obd->obd_name, cluuid->uuid, rc);
818                         GOTO(exit_err, rc = -EALREADY);
819                 }
820         }
821
822         class_incref(obd, "export", export);
823         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
824         list_add_tail(&export->exp_obd_chain_timed,
825                       &export->exp_obd->obd_exports_timed);
826         export->exp_obd->obd_num_exports++;
827         spin_unlock(&obd->obd_dev_lock);
828         RETURN(export);
829
830 exit_err:
831         spin_unlock(&obd->obd_dev_lock);
832         class_handle_unhash(&export->exp_handle);
833         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
834         obd_destroy_export(export);
835         OBD_FREE_PTR(export);
836         return ERR_PTR(rc);
837 }
838 EXPORT_SYMBOL(class_new_export);
839
840 void class_unlink_export(struct obd_export *exp)
841 {
842         class_handle_unhash(&exp->exp_handle);
843
844         spin_lock(&exp->exp_obd->obd_dev_lock);
845         /* delete an uuid-export hashitem from hashtables */
846         if (!hlist_unhashed(&exp->exp_uuid_hash))
847                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
848                                 &exp->exp_client_uuid,
849                                 &exp->exp_uuid_hash);
850
851         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
852         list_del_init(&exp->exp_obd_chain_timed);
853         exp->exp_obd->obd_num_exports--;
854         spin_unlock(&exp->exp_obd->obd_dev_lock);
855         class_export_put(exp);
856 }
857 EXPORT_SYMBOL(class_unlink_export);
858
859 /* Import management functions */
860 void class_import_destroy(struct obd_import *imp)
861 {
862         ENTRY;
863
864         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
865                 imp->imp_obd->obd_name);
866
867         LASSERT(atomic_read(&imp->imp_refcount) == 0);
868
869         ptlrpc_put_connection_superhack(imp->imp_connection);
870
871         while (!list_empty(&imp->imp_conn_list)) {
872                 struct obd_import_conn *imp_conn;
873
874                 imp_conn = list_entry(imp->imp_conn_list.next,
875                                       struct obd_import_conn, oic_item);
876                 list_del_init(&imp_conn->oic_item);
877                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
878                 OBD_FREE(imp_conn, sizeof(*imp_conn));
879         }
880
881         LASSERT(imp->imp_sec == NULL);
882         class_decref(imp->imp_obd, "import", imp);
883         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
884         EXIT;
885 }
886
887 static void import_handle_addref(void *import)
888 {
889         class_import_get(import);
890 }
891
892 struct obd_import *class_import_get(struct obd_import *import)
893 {
894         LASSERT(atomic_read(&import->imp_refcount) >= 0);
895         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
896         atomic_inc(&import->imp_refcount);
897         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
898                atomic_read(&import->imp_refcount),
899                import->imp_obd->obd_name);
900         return import;
901 }
902 EXPORT_SYMBOL(class_import_get);
903
904 void class_import_put(struct obd_import *imp)
905 {
906         ENTRY;
907
908         LASSERT(atomic_read(&imp->imp_refcount) > 0);
909         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
910         LASSERT(list_empty(&imp->imp_zombie_chain));
911
912         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
913                atomic_read(&imp->imp_refcount) - 1,
914                imp->imp_obd->obd_name);
915
916         if (atomic_dec_and_test(&imp->imp_refcount)) {
917                 CDEBUG(D_INFO, "final put import %p\n", imp);
918                 obd_zombie_import_add(imp);
919         }
920
921         EXIT;
922 }
923 EXPORT_SYMBOL(class_import_put);
924
925 static void init_imp_at(struct imp_at *at) {
926         int i;
927         at_init(&at->iat_net_latency, 0, 0);
928         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
929                 /* max service estimates are tracked on the server side, so
930                    don't use the AT history here, just use the last reported
931                    val. (But keep hist for proc histogram, worst_ever) */
932                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
933                         AT_FLG_NOHIST);
934         }
935 }
936
937 struct obd_import *class_new_import(struct obd_device *obd)
938 {
939         struct obd_import *imp;
940
941         OBD_ALLOC(imp, sizeof(*imp));
942         if (imp == NULL)
943                 return NULL;
944
945         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
946         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
947         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
948         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
949         spin_lock_init(&imp->imp_lock);
950         imp->imp_last_success_conn = 0;
951         imp->imp_state = LUSTRE_IMP_NEW;
952         imp->imp_obd = class_incref(obd, "import", imp);
953         sema_init(&imp->imp_sec_mutex, 1);
954         cfs_waitq_init(&imp->imp_recovery_waitq);
955
956         atomic_set(&imp->imp_refcount, 2);
957         atomic_set(&imp->imp_unregistering, 0);
958         atomic_set(&imp->imp_inflight, 0);
959         atomic_set(&imp->imp_replay_inflight, 0);
960         atomic_set(&imp->imp_inval_count, 0);
961         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
962         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
963         class_handle_hash(&imp->imp_handle, import_handle_addref);
964         init_imp_at(&imp->imp_at);
965
966         /* the default magic is V2, will be used in connect RPC, and
967          * then adjusted according to the flags in request/reply. */
968         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
969
970         return imp;
971 }
972 EXPORT_SYMBOL(class_new_import);
973
974 void class_destroy_import(struct obd_import *import)
975 {
976         LASSERT(import != NULL);
977         LASSERT(import != LP_POISON);
978
979         class_handle_unhash(&import->imp_handle);
980
981         spin_lock(&import->imp_lock);
982         import->imp_generation++;
983         spin_unlock(&import->imp_lock);
984         class_import_put(import);
985 }
986 EXPORT_SYMBOL(class_destroy_import);
987
988 /* A connection defines an export context in which preallocation can
989    be managed. This releases the export pointer reference, and returns
990    the export handle, so the export refcount is 1 when this function
991    returns. */
992 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
993                   struct obd_uuid *cluuid)
994 {
995         struct obd_export *export;
996         LASSERT(conn != NULL);
997         LASSERT(obd != NULL);
998         LASSERT(cluuid != NULL);
999         ENTRY;
1000
1001         export = class_new_export(obd, cluuid);
1002         if (IS_ERR(export))
1003                 RETURN(PTR_ERR(export));
1004
1005         conn->cookie = export->exp_handle.h_cookie;
1006         class_export_put(export);
1007
1008         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1009                cluuid->uuid, conn->cookie);
1010         RETURN(0);
1011 }
1012 EXPORT_SYMBOL(class_connect);
1013
1014 /* if export is involved in recovery then clean up related things */
1015 void class_export_recovery_cleanup(struct obd_export *exp)
1016 {
1017         struct obd_device *obd = exp->exp_obd;
1018
1019         spin_lock_bh(&obd->obd_processing_task_lock);
1020         if (exp->exp_delayed)
1021                 obd->obd_delayed_clients--;
1022         if (obd->obd_recovering && exp->exp_in_recovery) {
1023                 spin_lock(&exp->exp_lock);
1024                 exp->exp_in_recovery = 0;
1025                 spin_unlock(&exp->exp_lock);
1026                 LASSERT(obd->obd_connected_clients);
1027                 obd->obd_connected_clients--;
1028         }
1029         /** Cleanup req replay fields */
1030         if (exp->exp_req_replay_needed) {
1031                 spin_lock(&exp->exp_lock);
1032                 exp->exp_req_replay_needed = 0;
1033                 spin_unlock(&exp->exp_lock);
1034                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1035                 atomic_dec(&obd->obd_req_replay_clients);
1036         }
1037         /** Cleanup lock replay data */
1038         if (exp->exp_lock_replay_needed) {
1039                 spin_lock(&exp->exp_lock);
1040                 exp->exp_lock_replay_needed = 0;
1041                 spin_unlock(&exp->exp_lock);
1042                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1043                 atomic_dec(&obd->obd_lock_replay_clients);
1044         }
1045         spin_unlock_bh(&obd->obd_processing_task_lock);
1046 }
1047
1048 /* This function removes 1-3 references from the export:
1049  * 1 - for export pointer passed
1050  * and if disconnect really need
1051  * 2 - removing from hash
1052  * 3 - in client_unlink_export
1053  * The export pointer passed to this function can destroyed */
1054 int class_disconnect(struct obd_export *export)
1055 {
1056         int already_disconnected;
1057         ENTRY;
1058
1059         if (export == NULL) {
1060                 fixme();
1061                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1062                 RETURN(-EINVAL);
1063         }
1064
1065         spin_lock(&export->exp_lock);
1066         already_disconnected = export->exp_disconnected;
1067         export->exp_disconnected = 1;
1068         spin_unlock(&export->exp_lock);
1069
1070         /* class_cleanup(), abort_recovery(), and class_fail_export()
1071          * all end up in here, and if any of them race we shouldn't
1072          * call extra class_export_puts(). */
1073         if (already_disconnected) {
1074                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1075                 GOTO(no_disconn, already_disconnected);
1076         }
1077
1078         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1079                export->exp_handle.h_cookie);
1080
1081         if (!hlist_unhashed(&export->exp_nid_hash))
1082                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1083                                 &export->exp_connection->c_peer.nid,
1084                                 &export->exp_nid_hash);
1085
1086         class_export_recovery_cleanup(export);
1087         class_unlink_export(export);
1088 no_disconn:
1089         class_export_put(export);
1090         RETURN(0);
1091 }
1092
1093 static void class_disconnect_export_list(struct list_head *list,
1094                                          enum obd_option flags)
1095 {
1096         int rc;
1097         struct obd_export *exp;
1098         ENTRY;
1099
1100         /* It's possible that an export may disconnect itself, but
1101          * nothing else will be added to this list. */
1102         while (!list_empty(list)) {
1103                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1104                 /* need for safe call CDEBUG after obd_disconnect */
1105                 class_export_get(exp);
1106
1107                 spin_lock(&exp->exp_lock);
1108                 exp->exp_flags = flags;
1109                 spin_unlock(&exp->exp_lock);
1110
1111                 if (obd_uuid_equals(&exp->exp_client_uuid,
1112                                     &exp->exp_obd->obd_uuid)) {
1113                         CDEBUG(D_HA,
1114                                "exp %p export uuid == obd uuid, don't discon\n",
1115                                exp);
1116                         /* Need to delete this now so we don't end up pointing
1117                          * to work_list later when this export is cleaned up. */
1118                         list_del_init(&exp->exp_obd_chain);
1119                         class_export_put(exp);
1120                         continue;
1121                 }
1122
1123                 class_export_get(exp);
1124                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1125                        "last request at "CFS_TIME_T"\n",
1126                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1127                        exp, exp->exp_last_request_time);
1128                 /* release one export reference anyway */
1129                 rc = obd_disconnect(exp);
1130
1131                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1132                        obd_export_nid2str(exp), exp, rc);
1133                 class_export_put(exp);
1134         }
1135         EXIT;
1136 }
1137
1138 void class_disconnect_exports(struct obd_device *obd)
1139 {
1140         struct list_head work_list;
1141         ENTRY;
1142
1143         /* Move all of the exports from obd_exports to a work list, en masse. */
1144         CFS_INIT_LIST_HEAD(&work_list);
1145         spin_lock(&obd->obd_dev_lock);
1146         list_splice_init(&obd->obd_exports, &work_list);
1147         list_splice_init(&obd->obd_delayed_exports, &work_list);
1148         spin_unlock(&obd->obd_dev_lock);
1149
1150         if (!list_empty(&work_list)) {
1151                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1152                        "disconnecting them\n", obd->obd_minor, obd);
1153                 class_disconnect_export_list(&work_list,
1154                                              exp_flags_from_obd(obd));
1155         } else
1156                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1157                        obd->obd_minor, obd);
1158         EXIT;
1159 }
1160 EXPORT_SYMBOL(class_disconnect_exports);
1161
1162 /* Remove exports that have not completed recovery.
1163  */
1164 void class_disconnect_stale_exports(struct obd_device *obd,
1165                                     int (*test_export)(struct obd_export *))
1166 {
1167         struct list_head work_list;
1168         struct list_head *pos, *n;
1169         struct obd_export *exp;
1170         int evicted = 0;
1171         ENTRY;
1172
1173         CFS_INIT_LIST_HEAD(&work_list);
1174         spin_lock(&obd->obd_dev_lock);
1175         list_for_each_safe(pos, n, &obd->obd_exports) {
1176                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1177                 if (test_export(exp))
1178                         continue;
1179
1180                 /* don't count self-export as client */
1181                 if (obd_uuid_equals(&exp->exp_client_uuid,
1182                                     &exp->exp_obd->obd_uuid))
1183                         continue;
1184
1185                 list_move(&exp->exp_obd_chain, &work_list);
1186                 evicted++;
1187                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1188                        obd->obd_name, exp->exp_client_uuid.uuid,
1189                        exp->exp_connection == NULL ? "<unknown>" :
1190                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1191                 print_export_data(exp, "EVICTING");
1192         }
1193         spin_unlock(&obd->obd_dev_lock);
1194
1195         if (evicted) {
1196                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1197                        obd->obd_name, evicted);
1198                 obd->obd_stale_clients += evicted;
1199         }
1200         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1201                                                  OBD_OPT_ABORT_RECOV);
1202         EXIT;
1203 }
1204 EXPORT_SYMBOL(class_disconnect_stale_exports);
1205
1206 void class_fail_export(struct obd_export *exp)
1207 {
1208         int rc, already_failed;
1209
1210         spin_lock(&exp->exp_lock);
1211         already_failed = exp->exp_failed;
1212         exp->exp_failed = 1;
1213         spin_unlock(&exp->exp_lock);
1214
1215         if (already_failed) {
1216                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1217                        exp, exp->exp_client_uuid.uuid);
1218                 return;
1219         }
1220
1221         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1222                exp, exp->exp_client_uuid.uuid);
1223
1224         if (obd_dump_on_timeout)
1225                 libcfs_debug_dumplog();
1226
1227         /* Most callers into obd_disconnect are removing their own reference
1228          * (request, for example) in addition to the one from the hash table.
1229          * We don't have such a reference here, so make one. */
1230         class_export_get(exp);
1231         rc = obd_disconnect(exp);
1232         if (rc)
1233                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1234         else
1235                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1236                        exp, exp->exp_client_uuid.uuid);
1237 }
1238 EXPORT_SYMBOL(class_fail_export);
1239
1240 char *obd_export_nid2str(struct obd_export *exp)
1241 {
1242         if (exp->exp_connection != NULL)
1243                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1244
1245         return "(no nid)";
1246 }
1247 EXPORT_SYMBOL(obd_export_nid2str);
1248
1249 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1250 {
1251         struct obd_export *doomed_exp = NULL;
1252         int exports_evicted = 0;
1253
1254         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1255
1256         do {
1257                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1258                 if (doomed_exp == NULL)
1259                         break;
1260
1261                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1262                          "nid %s found, wanted nid %s, requested nid %s\n",
1263                          obd_export_nid2str(doomed_exp),
1264                          libcfs_nid2str(nid_key), nid);
1265                 LASSERTF(doomed_exp != obd->obd_self_export,
1266                          "self-export is hashed by NID?\n");
1267                 exports_evicted++;
1268                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1269                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1270                        exports_evicted);
1271                 class_fail_export(doomed_exp);
1272                 class_export_put(doomed_exp);
1273         } while (1);
1274
1275         if (!exports_evicted)
1276                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1277                        obd->obd_name, nid);
1278         return exports_evicted;
1279 }
1280 EXPORT_SYMBOL(obd_export_evict_by_nid);
1281
1282 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1283 {
1284         struct obd_export *doomed_exp = NULL;
1285         struct obd_uuid doomed_uuid;
1286         int exports_evicted = 0;
1287
1288         obd_str2uuid(&doomed_uuid, uuid);
1289         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1290                 CERROR("%s: can't evict myself\n", obd->obd_name);
1291                 return exports_evicted;
1292         }
1293
1294         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1295
1296         if (doomed_exp == NULL) {
1297                 CERROR("%s: can't disconnect %s: no exports found\n",
1298                        obd->obd_name, uuid);
1299         } else {
1300                 CWARN("%s: evicting %s at adminstrative request\n",
1301                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1302                 class_fail_export(doomed_exp);
1303                 class_export_put(doomed_exp);
1304                 exports_evicted++;
1305         }
1306
1307         return exports_evicted;
1308 }
1309 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1310
1311 static void print_export_data(struct obd_export *exp, const char *status)
1312 {
1313         struct ptlrpc_reply_state *rs;
1314         struct ptlrpc_reply_state *first_reply = NULL;
1315         int nreplies = 0;
1316
1317         spin_lock(&exp->exp_lock);
1318         list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1319                 if (nreplies == 0)
1320                         first_reply = rs;
1321                 nreplies++;
1322         }
1323         spin_unlock(&exp->exp_lock);
1324
1325         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1326                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1327                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1328                atomic_read(&exp->exp_rpc_count),
1329                atomic_read(&exp->exp_cb_count),
1330                atomic_read(&exp->exp_locks_count),
1331                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1332                nreplies, first_reply, nreplies > 3 ? "..." : "",
1333                exp->exp_last_committed);
1334 }
1335
1336 void dump_exports(struct obd_device *obd)
1337 {
1338         struct obd_export *exp;
1339
1340         spin_lock(&obd->obd_dev_lock);
1341         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1342                 print_export_data(exp, "ACTIVE");
1343         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1344                 print_export_data(exp, "UNLINKED");
1345         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1346                 print_export_data(exp, "DELAYED");
1347         spin_unlock(&obd->obd_dev_lock);
1348         spin_lock(&obd_zombie_impexp_lock);
1349         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1350                 print_export_data(exp, "ZOMBIE");
1351         spin_unlock(&obd_zombie_impexp_lock);
1352 }
1353 EXPORT_SYMBOL(dump_exports);
1354
1355 void obd_exports_barrier(struct obd_device *obd)
1356 {
1357         int waited = 2;
1358         LASSERT(list_empty(&obd->obd_exports));
1359         spin_lock(&obd->obd_dev_lock);
1360         while (!list_empty(&obd->obd_unlinked_exports)) {
1361                 spin_unlock(&obd->obd_dev_lock);
1362                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1363                 if (waited > 5 && IS_PO2(waited)) {
1364                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1365                                       "more than %d seconds. "
1366                                       "The obd refcount = %d. Is it stuck?\n",
1367                                       obd->obd_name, waited,
1368                                       atomic_read(&obd->obd_refcount));
1369                         dump_exports(obd);
1370                 }
1371                 waited *= 2;
1372                 spin_lock(&obd->obd_dev_lock);
1373         }
1374         spin_unlock(&obd->obd_dev_lock);
1375 }
1376 EXPORT_SYMBOL(obd_exports_barrier);
1377
1378 /**
1379  * kill zombie imports and exports
1380  */
1381 void obd_zombie_impexp_cull(void)
1382 {
1383         struct obd_import *import;
1384         struct obd_export *export;
1385         ENTRY;
1386
1387         do {
1388                 spin_lock(&obd_zombie_impexp_lock);
1389
1390                 import = NULL;
1391                 if (!list_empty(&obd_zombie_imports)) {
1392                         import = list_entry(obd_zombie_imports.next,
1393                                             struct obd_import,
1394                                             imp_zombie_chain);
1395                         list_del_init(&import->imp_zombie_chain);
1396                 }
1397
1398                 export = NULL;
1399                 if (!list_empty(&obd_zombie_exports)) {
1400                         export = list_entry(obd_zombie_exports.next,
1401                                             struct obd_export,
1402                                             exp_obd_chain);
1403                         list_del_init(&export->exp_obd_chain);
1404                 }
1405
1406                 spin_unlock(&obd_zombie_impexp_lock);
1407
1408                 if (import != NULL)
1409                         class_import_destroy(import);
1410
1411                 if (export != NULL)
1412                         class_export_destroy(export);
1413
1414         } while (import != NULL || export != NULL);
1415         EXIT;
1416 }
1417
1418 static struct completion        obd_zombie_start;
1419 static struct completion        obd_zombie_stop;
1420 static unsigned long            obd_zombie_flags;
1421 static cfs_waitq_t              obd_zombie_waitq;
1422 static pid_t                    obd_zombie_pid;
1423
1424 enum {
1425         OBD_ZOMBIE_STOP   = 1 << 1
1426 };
1427
1428 /**
1429  * check for work for kill zombie import/export thread.
1430  */
1431 static int obd_zombie_impexp_check(void *arg)
1432 {
1433         int rc;
1434
1435         spin_lock(&obd_zombie_impexp_lock);
1436         rc = list_empty(&obd_zombie_imports) &&
1437              list_empty(&obd_zombie_exports) &&
1438              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1439
1440         spin_unlock(&obd_zombie_impexp_lock);
1441
1442         RETURN(rc);
1443 }
1444
1445 /**
1446  * Add export to the obd_zombe thread and notify it.
1447  */
1448 static void obd_zombie_export_add(struct obd_export *exp) {
1449         spin_lock(&exp->exp_obd->obd_dev_lock);
1450         LASSERT(!list_empty(&exp->exp_obd_chain));
1451         list_del_init(&exp->exp_obd_chain);
1452         spin_unlock(&exp->exp_obd->obd_dev_lock);
1453         spin_lock(&obd_zombie_impexp_lock);
1454         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1455         spin_unlock(&obd_zombie_impexp_lock);
1456
1457         if (obd_zombie_impexp_notify != NULL)
1458                 obd_zombie_impexp_notify();
1459 }
1460
1461 /**
1462  * Add import to the obd_zombe thread and notify it.
1463  */
1464 static void obd_zombie_import_add(struct obd_import *imp) {
1465         LASSERT(imp->imp_sec == NULL);
1466         spin_lock(&obd_zombie_impexp_lock);
1467         LASSERT(list_empty(&imp->imp_zombie_chain));
1468         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1469         spin_unlock(&obd_zombie_impexp_lock);
1470
1471         if (obd_zombie_impexp_notify != NULL)
1472                 obd_zombie_impexp_notify();
1473 }
1474
1475 /**
1476  * notify import/export destroy thread about new zombie.
1477  */
1478 static void obd_zombie_impexp_notify(void)
1479 {
1480         cfs_waitq_signal(&obd_zombie_waitq);
1481 }
1482
1483 /**
1484  * check whether obd_zombie is idle
1485  */
1486 static int obd_zombie_is_idle(void)
1487 {
1488         int rc;
1489
1490         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1491         spin_lock(&obd_zombie_impexp_lock);
1492         rc = list_empty(&obd_zombie_imports) &&
1493              list_empty(&obd_zombie_exports);
1494         spin_unlock(&obd_zombie_impexp_lock);
1495         return rc;
1496 }
1497
1498 /**
1499  * wait when obd_zombie import/export queues become empty
1500  */
1501 void obd_zombie_barrier(void)
1502 {
1503         struct l_wait_info lwi = { 0 };
1504
1505         if (obd_zombie_pid == cfs_curproc_pid())
1506                 /* don't wait for myself */
1507                 return;
1508         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1509 }
1510 EXPORT_SYMBOL(obd_zombie_barrier);
1511
1512 #ifdef __KERNEL__
1513
1514 /**
1515  * destroy zombie export/import thread.
1516  */
1517 static int obd_zombie_impexp_thread(void *unused)
1518 {
1519         int rc;
1520
1521         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1522                 complete(&obd_zombie_start);
1523                 RETURN(rc);
1524         }
1525
1526         complete(&obd_zombie_start);
1527
1528         obd_zombie_pid = cfs_curproc_pid();
1529
1530         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1531                 struct l_wait_info lwi = { 0 };
1532
1533                 l_wait_event(obd_zombie_waitq,
1534                              !obd_zombie_impexp_check(NULL), &lwi);
1535                 obd_zombie_impexp_cull();
1536
1537                 /*
1538                  * Notify obd_zombie_barrier callers that queues
1539                  * may be empty.
1540                  */
1541                 cfs_waitq_signal(&obd_zombie_waitq);
1542         }
1543
1544         complete(&obd_zombie_stop);
1545
1546         RETURN(0);
1547 }
1548
1549 #else /* ! KERNEL */
1550
1551 static atomic_t zombie_recur = ATOMIC_INIT(0);
1552 static void *obd_zombie_impexp_work_cb;
1553 static void *obd_zombie_impexp_idle_cb;
1554
1555 int obd_zombie_impexp_kill(void *arg)
1556 {
1557         int rc = 0;
1558
1559         if (atomic_inc_return(&zombie_recur) == 1) {
1560                 obd_zombie_impexp_cull();
1561                 rc = 1;
1562         }
1563         atomic_dec(&zombie_recur);
1564         return rc;
1565 }
1566
1567 #endif
1568
1569 /**
1570  * start destroy zombie import/export thread
1571  */
1572 int obd_zombie_impexp_init(void)
1573 {
1574         int rc;
1575
1576         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1577         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1578         spin_lock_init(&obd_zombie_impexp_lock);
1579         init_completion(&obd_zombie_start);
1580         init_completion(&obd_zombie_stop);
1581         cfs_waitq_init(&obd_zombie_waitq);
1582         obd_zombie_pid = 0;
1583
1584 #ifdef __KERNEL__
1585         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1586         if (rc < 0)
1587                 RETURN(rc);
1588
1589         wait_for_completion(&obd_zombie_start);
1590 #else
1591
1592         obd_zombie_impexp_work_cb =
1593                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1594                                                  &obd_zombie_impexp_kill, NULL);
1595
1596         obd_zombie_impexp_idle_cb =
1597                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1598                                                  &obd_zombie_impexp_check, NULL);
1599         rc = 0;
1600 #endif
1601         RETURN(rc);
1602 }
1603 /**
1604  * stop destroy zombie import/export thread
1605  */
1606 void obd_zombie_impexp_stop(void)
1607 {
1608         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1609         obd_zombie_impexp_notify();
1610 #ifdef __KERNEL__
1611         wait_for_completion(&obd_zombie_stop);
1612 #else
1613         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1614         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1615 #endif
1616 }
1617