Whamcloud - gitweb
d41576cb6648d51b0ddc1f1e7f25c2c4f6f9d11e
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83 EXPORT_SYMBOL(obd_device_alloc);
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115
116 struct obd_type *class_get_type(const char *name)
117 {
118         struct obd_type *type = class_search_type(name);
119
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
121         if (!type) {
122                 const char *modname = name;
123                 if (!cfs_request_module("%s", modname)) {
124                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125                         type = class_search_type(name);
126                 } else {
127                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
128                                            modname);
129                 }
130         }
131 #endif
132         if (type) {
133                 cfs_spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 cfs_try_module_get(type->typ_dt_ops->o_owner);
136                 cfs_spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149
150 #define CLASS_MAX_NAME 1024
151
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153                         struct lprocfs_vars *vars, const char *name,
154                         struct lu_device_type *ldt)
155 {
156         struct obd_type *type;
157         int rc = 0;
158         ENTRY;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 RETURN(-EEXIST);
166         }
167
168         rc = -ENOMEM;
169         OBD_ALLOC(type, sizeof(*type));
170         if (type == NULL)
171                 RETURN(rc);
172
173         OBD_ALLOC_PTR(type->typ_dt_ops);
174         OBD_ALLOC_PTR(type->typ_md_ops);
175         OBD_ALLOC(type->typ_name, strlen(name) + 1);
176
177         if (type->typ_dt_ops == NULL ||
178             type->typ_md_ops == NULL ||
179             type->typ_name == NULL)
180                 GOTO (failed, rc);
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         cfs_spin_lock_init(&type->obd_type_lock);
188
189 #ifdef LPROCFS
190         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191                                               vars, type);
192         if (IS_ERR(type->typ_procroot)) {
193                 rc = PTR_ERR(type->typ_procroot);
194                 type->typ_procroot = NULL;
195                 GOTO (failed, rc);
196         }
197 #endif
198         if (ldt != NULL) {
199                 type->typ_lu = ldt;
200                 rc = lu_device_type_init(ldt);
201                 if (rc != 0)
202                         GOTO (failed, rc);
203         }
204
205         cfs_spin_lock(&obd_types_lock);
206         cfs_list_add(&type->typ_chain, &obd_types);
207         cfs_spin_unlock(&obd_types_lock);
208
209         RETURN (0);
210
211  failed:
212         if (type->typ_name != NULL)
213                 OBD_FREE(type->typ_name, strlen(name) + 1);
214         if (type->typ_md_ops != NULL)
215                 OBD_FREE_PTR(type->typ_md_ops);
216         if (type->typ_dt_ops != NULL)
217                 OBD_FREE_PTR(type->typ_dt_ops);
218         OBD_FREE(type, sizeof(*type));
219         RETURN(rc);
220 }
221
222 int class_unregister_type(const char *name)
223 {
224         struct obd_type *type = class_search_type(name);
225         ENTRY;
226
227         if (!type) {
228                 CERROR("unknown obd type\n");
229                 RETURN(-EINVAL);
230         }
231
232         if (type->typ_refcnt) {
233                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234                 /* This is a bad situation, let's make the best of it */
235                 /* Remove ops, but leave the name for debugging */
236                 OBD_FREE_PTR(type->typ_dt_ops);
237                 OBD_FREE_PTR(type->typ_md_ops);
238                 RETURN(-EBUSY);
239         }
240
241         if (type->typ_procroot) {
242                 lprocfs_remove(&type->typ_procroot);
243         }
244
245         if (type->typ_lu)
246                 lu_device_type_fini(type->typ_lu);
247
248         cfs_spin_lock(&obd_types_lock);
249         cfs_list_del(&type->typ_chain);
250         cfs_spin_unlock(&obd_types_lock);
251         OBD_FREE(type->typ_name, strlen(name) + 1);
252         if (type->typ_dt_ops != NULL)
253                 OBD_FREE_PTR(type->typ_dt_ops);
254         if (type->typ_md_ops != NULL)
255                 OBD_FREE_PTR(type->typ_md_ops);
256         OBD_FREE(type, sizeof(*type));
257         RETURN(0);
258 } /* class_unregister_type */
259
260 /**
261  * Create a new obd device.
262  *
263  * Find an empty slot in ::obd_devs[], create a new obd device in it.
264  *
265  * \param[in] type_name obd device type string.
266  * \param[in] name      obd device name.
267  *
268  * \retval NULL if create fails, otherwise return the obd device
269  *         pointer created.
270  */
271 struct obd_device *class_newdev(const char *type_name, const char *name)
272 {
273         struct obd_device *result = NULL;
274         struct obd_device *newdev;
275         struct obd_type *type = NULL;
276         int i;
277         int new_obd_minor = 0;
278
279         if (strlen(name) >= MAX_OBD_NAME) {
280                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281                 RETURN(ERR_PTR(-EINVAL));
282         }
283
284         type = class_get_type(type_name);
285         if (type == NULL){
286                 CERROR("OBD: unknown type: %s\n", type_name);
287                 RETURN(ERR_PTR(-ENODEV));
288         }
289
290         newdev = obd_device_alloc();
291         if (newdev == NULL) {
292                 class_put_type(type);
293                 RETURN(ERR_PTR(-ENOMEM));
294         }
295         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296
297         cfs_spin_lock(&obd_dev_lock);
298         for (i = 0; i < class_devno_max(); i++) {
299                 struct obd_device *obd = class_num2obd(i);
300                 if (obd && obd->obd_name &&
301                     (strcmp(name, obd->obd_name) == 0)) {
302                         CERROR("Device %s already exists, won't add\n", name);
303                         if (result) {
304                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305                                          "%p obd_magic %08x != %08x\n", result,
306                                          result->obd_magic, OBD_DEVICE_MAGIC);
307                                 LASSERTF(result->obd_minor == new_obd_minor,
308                                          "%p obd_minor %d != %d\n", result,
309                                          result->obd_minor, new_obd_minor);
310
311                                 obd_devs[result->obd_minor] = NULL;
312                                 result->obd_name[0]='\0';
313                          }
314                         result = ERR_PTR(-EEXIST);
315                         break;
316                 }
317                 if (!result && !obd) {
318                         result = newdev;
319                         result->obd_minor = i;
320                         new_obd_minor = i;
321                         result->obd_type = type;
322                         strncpy(result->obd_name, name,
323                                 sizeof(result->obd_name) - 1);
324                         obd_devs[i] = result;
325                 }
326         }
327         cfs_spin_unlock(&obd_dev_lock);
328
329         if (result == NULL && i >= class_devno_max()) {
330                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331                        class_devno_max());
332                 result = ERR_PTR(-EOVERFLOW);
333         }
334
335         if (IS_ERR(result)) {
336                 obd_device_free(newdev);
337                 class_put_type(type);
338         } else {
339                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340                        result->obd_name, result);
341         }
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type != NULL);
354
355         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356                obd->obd_name,obd->obd_type->typ_name);
357
358         cfs_spin_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         cfs_spin_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         cfs_spin_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377                         /* Make sure we finished attaching before we give
378                            out any references */
379                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380                         if (obd->obd_attached) {
381                                 cfs_spin_unlock(&obd_dev_lock);
382                                 return i;
383                         }
384                         break;
385                 }
386         }
387         cfs_spin_unlock(&obd_dev_lock);
388
389         return -1;
390 }
391
392 struct obd_device *class_name2obd(const char *name)
393 {
394         int dev = class_name2dev(name);
395
396         if (dev < 0 || dev > class_devno_max())
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 int class_uuid2dev(struct obd_uuid *uuid)
402 {
403         int i;
404
405         cfs_spin_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410                         cfs_spin_unlock(&obd_dev_lock);
411                         return i;
412                 }
413         }
414         cfs_spin_unlock(&obd_dev_lock);
415
416         return -1;
417 }
418
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 {
421         int dev = class_uuid2dev(uuid);
422         if (dev < 0)
423                 return NULL;
424         return class_num2obd(dev);
425 }
426
427 /**
428  * Get obd device from ::obd_devs[]
429  *
430  * \param num [in] array index
431  *
432  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433  *         otherwise return the obd device there.
434  */
435 struct obd_device *class_num2obd(int num)
436 {
437         struct obd_device *obd = NULL;
438
439         if (num < class_devno_max()) {
440                 obd = obd_devs[num];
441                 if (obd == NULL)
442                         return NULL;
443
444                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445                          "%p obd_magic %08x != %08x\n",
446                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447                 LASSERTF(obd->obd_minor == num,
448                          "%p obd_minor %0d != %0d\n",
449                          obd, obd->obd_minor, num);
450         }
451
452         return obd;
453 }
454
455 void class_obd_list(void)
456 {
457         char *status;
458         int i;
459
460         cfs_spin_lock(&obd_dev_lock);
461         for (i = 0; i < class_devno_max(); i++) {
462                 struct obd_device *obd = class_num2obd(i);
463                 if (obd == NULL)
464                         continue;
465                 if (obd->obd_stopping)
466                         status = "ST";
467                 else if (obd->obd_set_up)
468                         status = "UP";
469                 else if (obd->obd_attached)
470                         status = "AT";
471                 else
472                         status = "--";
473                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474                          i, status, obd->obd_type->typ_name,
475                          obd->obd_name, obd->obd_uuid.uuid,
476                          cfs_atomic_read(&obd->obd_refcount));
477         }
478         cfs_spin_unlock(&obd_dev_lock);
479         return;
480 }
481
482 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
483    specified, then only the client with that uuid is returned,
484    otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486                                           const char * typ_name,
487                                           struct obd_uuid *grp_uuid)
488 {
489         int i;
490
491         cfs_spin_lock(&obd_dev_lock);
492         for (i = 0; i < class_devno_max(); i++) {
493                 struct obd_device *obd = class_num2obd(i);
494                 if (obd == NULL)
495                         continue;
496                 if ((strncmp(obd->obd_type->typ_name, typ_name,
497                              strlen(typ_name)) == 0)) {
498                         if (obd_uuid_equals(tgt_uuid,
499                                             &obd->u.cli.cl_target_uuid) &&
500                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
501                                                          &obd->obd_uuid) : 1)) {
502                                 cfs_spin_unlock(&obd_dev_lock);
503                                 return obd;
504                         }
505                 }
506         }
507         cfs_spin_unlock(&obd_dev_lock);
508
509         return NULL;
510 }
511
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513    searching at *next, and if a device is found, the next index to look
514    at is saved in *next. If next is NULL, then the first matching device
515    will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
517 {
518         int i;
519
520         if (next == NULL)
521                 i = 0;
522         else if (*next >= 0 && *next < class_devno_max())
523                 i = *next;
524         else
525                 return NULL;
526
527         cfs_spin_lock(&obd_dev_lock);
528         for (; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530                 if (obd == NULL)
531                         continue;
532                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
533                         if (next != NULL)
534                                 *next = i+1;
535                         cfs_spin_unlock(&obd_dev_lock);
536                         return obd;
537                 }
538         }
539         cfs_spin_unlock(&obd_dev_lock);
540
541         return NULL;
542 }
543
544 /**
545  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546  * adjust sptlrpc settings accordingly.
547  */
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 {
550         struct obd_device  *obd;
551         const char         *type;
552         int                 i, rc = 0, rc2;
553
554         LASSERT(namelen > 0);
555
556         cfs_spin_lock(&obd_dev_lock);
557         for (i = 0; i < class_devno_max(); i++) {
558                 obd = class_num2obd(i);
559
560                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
561                         continue;
562
563                 /* only notify mdc, osc, mdt, ost */
564                 type = obd->obd_type->typ_name;
565                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OST_NAME) != 0)
569                         continue;
570
571                 if (strncmp(obd->obd_name, fsname, namelen))
572                         continue;
573
574                 class_incref(obd, __FUNCTION__, obd);
575                 cfs_spin_unlock(&obd_dev_lock);
576                 rc2 = obd_set_info_async(obd->obd_self_export,
577                                          sizeof(KEY_SPTLRPC_CONF),
578                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
579                 rc = rc ? rc : rc2;
580                 class_decref(obd, __FUNCTION__, obd);
581                 cfs_spin_lock(&obd_dev_lock);
582         }
583         cfs_spin_unlock(&obd_dev_lock);
584         return rc;
585 }
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587
588 void obd_cleanup_caches(void)
589 {
590         int rc;
591
592         ENTRY;
593         if (obd_device_cachep) {
594                 rc = cfs_mem_cache_destroy(obd_device_cachep);
595                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596                 obd_device_cachep = NULL;
597         }
598         if (obdo_cachep) {
599                 rc = cfs_mem_cache_destroy(obdo_cachep);
600                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601                 obdo_cachep = NULL;
602         }
603         if (import_cachep) {
604                 rc = cfs_mem_cache_destroy(import_cachep);
605                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606                 import_cachep = NULL;
607         }
608         if (capa_cachep) {
609                 rc = cfs_mem_cache_destroy(capa_cachep);
610                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
611                 capa_cachep = NULL;
612         }
613         EXIT;
614 }
615
616 int obd_init_caches(void)
617 {
618         ENTRY;
619
620         LASSERT(obd_device_cachep == NULL);
621         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622                                                  sizeof(struct obd_device),
623                                                  0, 0);
624         if (!obd_device_cachep)
625                 GOTO(out, -ENOMEM);
626
627         LASSERT(obdo_cachep == NULL);
628         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
629                                            0, 0);
630         if (!obdo_cachep)
631                 GOTO(out, -ENOMEM);
632
633         LASSERT(import_cachep == NULL);
634         import_cachep = cfs_mem_cache_create("ll_import_cache",
635                                              sizeof(struct obd_import),
636                                              0, 0);
637         if (!import_cachep)
638                 GOTO(out, -ENOMEM);
639
640         LASSERT(capa_cachep == NULL);
641         capa_cachep = cfs_mem_cache_create("capa_cache",
642                                            sizeof(struct obd_capa), 0, 0);
643         if (!capa_cachep)
644                 GOTO(out, -ENOMEM);
645
646         RETURN(0);
647  out:
648         obd_cleanup_caches();
649         RETURN(-ENOMEM);
650
651 }
652
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 {
656         struct obd_export *export;
657         ENTRY;
658
659         if (!conn) {
660                 CDEBUG(D_CACHE, "looking for null handle\n");
661                 RETURN(NULL);
662         }
663
664         if (conn->cookie == -1) {  /* this means assign a new connection */
665                 CDEBUG(D_CACHE, "want a new connection\n");
666                 RETURN(NULL);
667         }
668
669         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670         export = class_handle2object(conn->cookie);
671         RETURN(export);
672 }
673
674 struct obd_device *class_exp2obd(struct obd_export *exp)
675 {
676         if (exp)
677                 return exp->exp_obd;
678         return NULL;
679 }
680
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 {
683         struct obd_export *export;
684         export = class_conn2export(conn);
685         if (export) {
686                 struct obd_device *obd = export->exp_obd;
687                 class_export_put(export);
688                 return obd;
689         }
690         return NULL;
691 }
692
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 {
695         struct obd_device *obd = exp->exp_obd;
696         if (obd == NULL)
697                 return NULL;
698         return obd->u.cli.cl_import;
699 }
700
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 {
703         struct obd_device *obd = class_conn2obd(conn);
704         if (obd == NULL)
705                 return NULL;
706         return obd->u.cli.cl_import;
707 }
708
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
711 {
712         struct obd_device *obd = exp->exp_obd;
713         ENTRY;
714
715         LASSERT (cfs_atomic_read(&exp->exp_refcount) == 0);
716
717         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718                exp->exp_client_uuid.uuid, obd->obd_name);
719
720         LASSERT(obd != NULL);
721
722         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723         if (exp->exp_connection)
724                 ptlrpc_put_connection_superhack(exp->exp_connection);
725
726         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
727         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
728         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
729         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
730         obd_destroy_export(exp);
731         class_decref(obd, "export", exp);
732
733         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
734         EXIT;
735 }
736
737 static void export_handle_addref(void *export)
738 {
739         class_export_get(export);
740 }
741
742 struct obd_export *class_export_get(struct obd_export *exp)
743 {
744         cfs_atomic_inc(&exp->exp_refcount);
745         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746                cfs_atomic_read(&exp->exp_refcount));
747         return exp;
748 }
749 EXPORT_SYMBOL(class_export_get);
750
751 void class_export_put(struct obd_export *exp)
752 {
753         LASSERT(exp != NULL);
754         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755                cfs_atomic_read(&exp->exp_refcount) - 1);
756         LASSERT(cfs_atomic_read(&exp->exp_refcount) > 0);
757         LASSERT(cfs_atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758
759         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
760                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
761                 CDEBUG(D_IOCTL, "final put %p/%s\n",
762                        exp, exp->exp_client_uuid.uuid);
763                 obd_zombie_export_add(exp);
764         }
765 }
766 EXPORT_SYMBOL(class_export_put);
767
768 /* Creates a new export, adds it to the hash table, and returns a
769  * pointer to it. The refcount is 2: one for the hash reference, and
770  * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772                                     struct obd_uuid *cluuid)
773 {
774         struct obd_export *export;
775         cfs_hash_t *hash = NULL;
776         int rc = 0;
777         ENTRY;
778
779         OBD_ALLOC_PTR(export);
780         if (!export)
781                 return ERR_PTR(-ENOMEM);
782
783         export->exp_conn_cnt = 0;
784         export->exp_lock_hash = NULL;
785         cfs_atomic_set(&export->exp_refcount, 2);
786         cfs_atomic_set(&export->exp_rpc_count, 0);
787         cfs_atomic_set(&export->exp_cb_count, 0);
788         cfs_atomic_set(&export->exp_locks_count, 0);
789 #if LUSTRE_TRACKS_LOCK_EXP_REFS
790         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
791         cfs_spin_lock_init(&export->exp_locks_list_guard);
792 #endif
793         cfs_atomic_set(&export->exp_replay_count, 0);
794         export->exp_obd = obd;
795         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
796         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
797         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
798         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
799         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
800         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
801         class_handle_hash(&export->exp_handle, export_handle_addref);
802         export->exp_last_request_time = cfs_time_current_sec();
803         cfs_spin_lock_init(&export->exp_lock);
804         cfs_spin_lock_init(&export->exp_rpc_lock);
805         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
806         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
807
808         export->exp_sp_peer = LUSTRE_SP_ANY;
809         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
810         export->exp_client_uuid = *cluuid;
811         obd_init_export(export);
812
813         cfs_spin_lock(&obd->obd_dev_lock);
814          /* shouldn't happen, but might race */
815         if (obd->obd_stopping)
816                 GOTO(exit_unlock, rc = -ENODEV);
817
818         hash = cfs_hash_getref(obd->obd_uuid_hash);
819         if (hash == NULL)
820                 GOTO(exit_unlock, rc = -ENODEV);
821         cfs_spin_unlock(&obd->obd_dev_lock);
822
823         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
824                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
825                 if (rc != 0) {
826                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
827                                       obd->obd_name, cluuid->uuid, rc);
828                         GOTO(exit_err, rc = -EALREADY);
829                 }
830         }
831
832         cfs_spin_lock(&obd->obd_dev_lock);
833         if (obd->obd_stopping) {
834                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
835                 GOTO(exit_unlock, rc = -ENODEV);
836         }
837
838         class_incref(obd, "export", export);
839         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
840         cfs_list_add_tail(&export->exp_obd_chain_timed,
841                           &export->exp_obd->obd_exports_timed);
842         export->exp_obd->obd_num_exports++;
843         cfs_spin_unlock(&obd->obd_dev_lock);
844         cfs_hash_putref(hash);
845         RETURN(export);
846
847 exit_unlock:
848         cfs_spin_unlock(&obd->obd_dev_lock);
849 exit_err:
850         if (hash)
851                 cfs_hash_putref(hash);
852         class_handle_unhash(&export->exp_handle);
853         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
854         obd_destroy_export(export);
855         OBD_FREE_PTR(export);
856         return ERR_PTR(rc);
857 }
858 EXPORT_SYMBOL(class_new_export);
859
860 void class_unlink_export(struct obd_export *exp)
861 {
862         class_handle_unhash(&exp->exp_handle);
863
864         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
865         /* delete an uuid-export hashitem from hashtables */
866         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
867                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
868                              &exp->exp_client_uuid,
869                              &exp->exp_uuid_hash);
870
871         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
872         cfs_list_del_init(&exp->exp_obd_chain_timed);
873         exp->exp_obd->obd_num_exports--;
874         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
875         class_export_put(exp);
876 }
877 EXPORT_SYMBOL(class_unlink_export);
878
879 /* Import management functions */
880 void class_import_destroy(struct obd_import *imp)
881 {
882         ENTRY;
883
884         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
885                 imp->imp_obd->obd_name);
886
887         LASSERT(cfs_atomic_read(&imp->imp_refcount) == 0);
888
889         ptlrpc_put_connection_superhack(imp->imp_connection);
890
891         while (!cfs_list_empty(&imp->imp_conn_list)) {
892                 struct obd_import_conn *imp_conn;
893
894                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
895                                           struct obd_import_conn, oic_item);
896                 cfs_list_del_init(&imp_conn->oic_item);
897                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
898                 OBD_FREE(imp_conn, sizeof(*imp_conn));
899         }
900
901         LASSERT(imp->imp_sec == NULL);
902         class_decref(imp->imp_obd, "import", imp);
903         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
904         EXIT;
905 }
906
907 static void import_handle_addref(void *import)
908 {
909         class_import_get(import);
910 }
911
912 struct obd_import *class_import_get(struct obd_import *import)
913 {
914         LASSERT(cfs_atomic_read(&import->imp_refcount) >= 0);
915         LASSERT(cfs_atomic_read(&import->imp_refcount) < 0x5a5a5a);
916         cfs_atomic_inc(&import->imp_refcount);
917         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
918                cfs_atomic_read(&import->imp_refcount),
919                import->imp_obd->obd_name);
920         return import;
921 }
922 EXPORT_SYMBOL(class_import_get);
923
924 void class_import_put(struct obd_import *imp)
925 {
926         ENTRY;
927
928         LASSERT(cfs_atomic_read(&imp->imp_refcount) > 0);
929         LASSERT(cfs_atomic_read(&imp->imp_refcount) < 0x5a5a5a);
930         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
931
932         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
933                cfs_atomic_read(&imp->imp_refcount) - 1,
934                imp->imp_obd->obd_name);
935
936         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
937                 CDEBUG(D_INFO, "final put import %p\n", imp);
938                 obd_zombie_import_add(imp);
939         }
940
941         EXIT;
942 }
943 EXPORT_SYMBOL(class_import_put);
944
945 static void init_imp_at(struct imp_at *at) {
946         int i;
947         at_init(&at->iat_net_latency, 0, 0);
948         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
949                 /* max service estimates are tracked on the server side, so
950                    don't use the AT history here, just use the last reported
951                    val. (But keep hist for proc histogram, worst_ever) */
952                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
953                         AT_FLG_NOHIST);
954         }
955 }
956
957 struct obd_import *class_new_import(struct obd_device *obd)
958 {
959         struct obd_import *imp;
960
961         OBD_ALLOC(imp, sizeof(*imp));
962         if (imp == NULL)
963                 return NULL;
964
965         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
966         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
967         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
968         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
969         cfs_spin_lock_init(&imp->imp_lock);
970         imp->imp_last_success_conn = 0;
971         imp->imp_state = LUSTRE_IMP_NEW;
972         imp->imp_obd = class_incref(obd, "import", imp);
973         cfs_sema_init(&imp->imp_sec_mutex, 1);
974         cfs_waitq_init(&imp->imp_recovery_waitq);
975
976         cfs_atomic_set(&imp->imp_refcount, 2);
977         cfs_atomic_set(&imp->imp_unregistering, 0);
978         cfs_atomic_set(&imp->imp_inflight, 0);
979         cfs_atomic_set(&imp->imp_replay_inflight, 0);
980         cfs_atomic_set(&imp->imp_inval_count, 0);
981         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
982         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
983         class_handle_hash(&imp->imp_handle, import_handle_addref);
984         init_imp_at(&imp->imp_at);
985
986         /* the default magic is V2, will be used in connect RPC, and
987          * then adjusted according to the flags in request/reply. */
988         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
989
990         return imp;
991 }
992 EXPORT_SYMBOL(class_new_import);
993
994 void class_destroy_import(struct obd_import *import)
995 {
996         LASSERT(import != NULL);
997         LASSERT(import != LP_POISON);
998
999         class_handle_unhash(&import->imp_handle);
1000
1001         cfs_spin_lock(&import->imp_lock);
1002         import->imp_generation++;
1003         cfs_spin_unlock(&import->imp_lock);
1004         class_import_put(import);
1005 }
1006 EXPORT_SYMBOL(class_destroy_import);
1007
1008 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1009
1010 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1011 {
1012         cfs_spin_lock(&exp->exp_locks_list_guard);
1013
1014         LASSERT(lock->l_exp_refs_nr >= 0);
1015
1016         if (lock->l_exp_refs_target != NULL &&
1017             lock->l_exp_refs_target != exp) {
1018                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1019                               exp, lock, lock->l_exp_refs_target);
1020         }
1021         if ((lock->l_exp_refs_nr ++) == 0) {
1022                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1023                 lock->l_exp_refs_target = exp;
1024         }
1025         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1026                lock, exp, lock->l_exp_refs_nr);
1027         cfs_spin_unlock(&exp->exp_locks_list_guard);
1028 }
1029 EXPORT_SYMBOL(__class_export_add_lock_ref);
1030
1031 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1032 {
1033         cfs_spin_lock(&exp->exp_locks_list_guard);
1034         LASSERT(lock->l_exp_refs_nr > 0);
1035         if (lock->l_exp_refs_target != exp) {
1036                 LCONSOLE_WARN("lock %p, "
1037                               "mismatching export pointers: %p, %p\n",
1038                               lock, lock->l_exp_refs_target, exp);
1039         }
1040         if (-- lock->l_exp_refs_nr == 0) {
1041                 cfs_list_del_init(&lock->l_exp_refs_link);
1042                 lock->l_exp_refs_target = NULL;
1043         }
1044         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1045                lock, exp, lock->l_exp_refs_nr);
1046         cfs_spin_unlock(&exp->exp_locks_list_guard);
1047 }
1048 EXPORT_SYMBOL(__class_export_del_lock_ref);
1049 #endif
1050
1051 /* A connection defines an export context in which preallocation can
1052    be managed. This releases the export pointer reference, and returns
1053    the export handle, so the export refcount is 1 when this function
1054    returns. */
1055 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1056                   struct obd_uuid *cluuid)
1057 {
1058         struct obd_export *export;
1059         LASSERT(conn != NULL);
1060         LASSERT(obd != NULL);
1061         LASSERT(cluuid != NULL);
1062         ENTRY;
1063
1064         export = class_new_export(obd, cluuid);
1065         if (IS_ERR(export))
1066                 RETURN(PTR_ERR(export));
1067
1068         conn->cookie = export->exp_handle.h_cookie;
1069         class_export_put(export);
1070
1071         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1072                cluuid->uuid, conn->cookie);
1073         RETURN(0);
1074 }
1075 EXPORT_SYMBOL(class_connect);
1076
1077 /* if export is involved in recovery then clean up related things */
1078 void class_export_recovery_cleanup(struct obd_export *exp)
1079 {
1080         struct obd_device *obd = exp->exp_obd;
1081
1082         cfs_spin_lock(&obd->obd_recovery_task_lock);
1083         if (exp->exp_delayed)
1084                 obd->obd_delayed_clients--;
1085         if (obd->obd_recovering && exp->exp_in_recovery) {
1086                 cfs_spin_lock(&exp->exp_lock);
1087                 exp->exp_in_recovery = 0;
1088                 cfs_spin_unlock(&exp->exp_lock);
1089                 LASSERT(obd->obd_connected_clients);
1090                 obd->obd_connected_clients--;
1091         }
1092         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1093         /** Cleanup req replay fields */
1094         if (exp->exp_req_replay_needed) {
1095                 cfs_spin_lock(&exp->exp_lock);
1096                 exp->exp_req_replay_needed = 0;
1097                 cfs_spin_unlock(&exp->exp_lock);
1098                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1099                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1100         }
1101         /** Cleanup lock replay data */
1102         if (exp->exp_lock_replay_needed) {
1103                 cfs_spin_lock(&exp->exp_lock);
1104                 exp->exp_lock_replay_needed = 0;
1105                 cfs_spin_unlock(&exp->exp_lock);
1106                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1107                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1108         }
1109 }
1110
1111 /* This function removes 1-3 references from the export:
1112  * 1 - for export pointer passed
1113  * and if disconnect really need
1114  * 2 - removing from hash
1115  * 3 - in client_unlink_export
1116  * The export pointer passed to this function can destroyed */
1117 int class_disconnect(struct obd_export *export)
1118 {
1119         int already_disconnected;
1120         ENTRY;
1121
1122         if (export == NULL) {
1123                 fixme();
1124                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1125                 RETURN(-EINVAL);
1126         }
1127
1128         cfs_spin_lock(&export->exp_lock);
1129         already_disconnected = export->exp_disconnected;
1130         export->exp_disconnected = 1;
1131         cfs_spin_unlock(&export->exp_lock);
1132
1133         /* class_cleanup(), abort_recovery(), and class_fail_export()
1134          * all end up in here, and if any of them race we shouldn't
1135          * call extra class_export_puts(). */
1136         if (already_disconnected) {
1137                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1138                 GOTO(no_disconn, already_disconnected);
1139         }
1140
1141         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1142                export->exp_handle.h_cookie);
1143
1144         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1145                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1146                              &export->exp_connection->c_peer.nid,
1147                              &export->exp_nid_hash);
1148
1149         class_export_recovery_cleanup(export);
1150         class_unlink_export(export);
1151 no_disconn:
1152         class_export_put(export);
1153         RETURN(0);
1154 }
1155
1156 /* Return non-zero for a fully connected export */
1157 int class_connected_export(struct obd_export *exp)
1158 {
1159         if (exp) {
1160                 int connected;
1161                 cfs_spin_lock(&exp->exp_lock);
1162                 connected = (exp->exp_conn_cnt > 0);
1163                 cfs_spin_unlock(&exp->exp_lock);
1164                 return connected;
1165         }
1166         return 0;
1167 }
1168 EXPORT_SYMBOL(class_connected_export);
1169
1170 static void class_disconnect_export_list(cfs_list_t *list,
1171                                          enum obd_option flags)
1172 {
1173         int rc;
1174         struct obd_export *exp;
1175         ENTRY;
1176
1177         /* It's possible that an export may disconnect itself, but
1178          * nothing else will be added to this list. */
1179         while (!cfs_list_empty(list)) {
1180                 exp = cfs_list_entry(list->next, struct obd_export,
1181                                      exp_obd_chain);
1182                 /* need for safe call CDEBUG after obd_disconnect */
1183                 class_export_get(exp);
1184
1185                 cfs_spin_lock(&exp->exp_lock);
1186                 exp->exp_flags = flags;
1187                 cfs_spin_unlock(&exp->exp_lock);
1188
1189                 if (obd_uuid_equals(&exp->exp_client_uuid,
1190                                     &exp->exp_obd->obd_uuid)) {
1191                         CDEBUG(D_HA,
1192                                "exp %p export uuid == obd uuid, don't discon\n",
1193                                exp);
1194                         /* Need to delete this now so we don't end up pointing
1195                          * to work_list later when this export is cleaned up. */
1196                         cfs_list_del_init(&exp->exp_obd_chain);
1197                         class_export_put(exp);
1198                         continue;
1199                 }
1200
1201                 class_export_get(exp);
1202                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1203                        "last request at "CFS_TIME_T"\n",
1204                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1205                        exp, exp->exp_last_request_time);
1206                 /* release one export reference anyway */
1207                 rc = obd_disconnect(exp);
1208
1209                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1210                        obd_export_nid2str(exp), exp, rc);
1211                 class_export_put(exp);
1212         }
1213         EXIT;
1214 }
1215
1216 void class_disconnect_exports(struct obd_device *obd)
1217 {
1218         cfs_list_t work_list;
1219         ENTRY;
1220
1221         /* Move all of the exports from obd_exports to a work list, en masse. */
1222         CFS_INIT_LIST_HEAD(&work_list);
1223         cfs_spin_lock(&obd->obd_dev_lock);
1224         cfs_list_splice_init(&obd->obd_exports, &work_list);
1225         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1226         cfs_spin_unlock(&obd->obd_dev_lock);
1227
1228         if (!cfs_list_empty(&work_list)) {
1229                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1230                        "disconnecting them\n", obd->obd_minor, obd);
1231                 class_disconnect_export_list(&work_list,
1232                                              exp_flags_from_obd(obd));
1233         } else
1234                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1235                        obd->obd_minor, obd);
1236         EXIT;
1237 }
1238 EXPORT_SYMBOL(class_disconnect_exports);
1239
1240 /* Remove exports that have not completed recovery.
1241  */
1242 void class_disconnect_stale_exports(struct obd_device *obd,
1243                                     int (*test_export)(struct obd_export *))
1244 {
1245         cfs_list_t work_list;
1246         cfs_list_t *pos, *n;
1247         struct obd_export *exp;
1248         int evicted = 0;
1249         ENTRY;
1250
1251         CFS_INIT_LIST_HEAD(&work_list);
1252         cfs_spin_lock(&obd->obd_dev_lock);
1253         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1254                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1255                 if (test_export(exp))
1256                         continue;
1257
1258                 /* don't count self-export as client */
1259                 if (obd_uuid_equals(&exp->exp_client_uuid,
1260                                     &exp->exp_obd->obd_uuid))
1261                         continue;
1262
1263                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1264                 evicted++;
1265                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1266                        obd->obd_name, exp->exp_client_uuid.uuid,
1267                        exp->exp_connection == NULL ? "<unknown>" :
1268                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1269                 print_export_data(exp, "EVICTING", 0);
1270         }
1271         cfs_spin_unlock(&obd->obd_dev_lock);
1272
1273         if (evicted) {
1274                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1275                        obd->obd_name, evicted);
1276                 obd->obd_stale_clients += evicted;
1277         }
1278         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1279                                                  OBD_OPT_ABORT_RECOV);
1280         EXIT;
1281 }
1282 EXPORT_SYMBOL(class_disconnect_stale_exports);
1283
1284 void class_fail_export(struct obd_export *exp)
1285 {
1286         int rc, already_failed;
1287
1288         cfs_spin_lock(&exp->exp_lock);
1289         already_failed = exp->exp_failed;
1290         exp->exp_failed = 1;
1291         cfs_spin_unlock(&exp->exp_lock);
1292
1293         if (already_failed) {
1294                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1295                        exp, exp->exp_client_uuid.uuid);
1296                 return;
1297         }
1298
1299         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1300                exp, exp->exp_client_uuid.uuid);
1301
1302         if (obd_dump_on_timeout)
1303                 libcfs_debug_dumplog();
1304
1305         /* Most callers into obd_disconnect are removing their own reference
1306          * (request, for example) in addition to the one from the hash table.
1307          * We don't have such a reference here, so make one. */
1308         class_export_get(exp);
1309         rc = obd_disconnect(exp);
1310         if (rc)
1311                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1312         else
1313                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1314                        exp, exp->exp_client_uuid.uuid);
1315 }
1316 EXPORT_SYMBOL(class_fail_export);
1317
1318 char *obd_export_nid2str(struct obd_export *exp)
1319 {
1320         if (exp->exp_connection != NULL)
1321                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1322
1323         return "(no nid)";
1324 }
1325 EXPORT_SYMBOL(obd_export_nid2str);
1326
1327 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1328 {
1329         struct obd_export *doomed_exp = NULL;
1330         int exports_evicted = 0;
1331
1332         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1333
1334         do {
1335                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1336                 if (doomed_exp == NULL)
1337                         break;
1338
1339                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1340                          "nid %s found, wanted nid %s, requested nid %s\n",
1341                          obd_export_nid2str(doomed_exp),
1342                          libcfs_nid2str(nid_key), nid);
1343                 LASSERTF(doomed_exp != obd->obd_self_export,
1344                          "self-export is hashed by NID?\n");
1345                 exports_evicted++;
1346                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1347                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1348                        exports_evicted);
1349                 class_fail_export(doomed_exp);
1350                 class_export_put(doomed_exp);
1351         } while (1);
1352
1353         if (!exports_evicted)
1354                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1355                        obd->obd_name, nid);
1356         return exports_evicted;
1357 }
1358 EXPORT_SYMBOL(obd_export_evict_by_nid);
1359
1360 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1361 {
1362         struct obd_export *doomed_exp = NULL;
1363         struct obd_uuid doomed_uuid;
1364         int exports_evicted = 0;
1365
1366         obd_str2uuid(&doomed_uuid, uuid);
1367         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1368                 CERROR("%s: can't evict myself\n", obd->obd_name);
1369                 return exports_evicted;
1370         }
1371
1372         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1373
1374         if (doomed_exp == NULL) {
1375                 CERROR("%s: can't disconnect %s: no exports found\n",
1376                        obd->obd_name, uuid);
1377         } else {
1378                 CWARN("%s: evicting %s at adminstrative request\n",
1379                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1380                 class_fail_export(doomed_exp);
1381                 class_export_put(doomed_exp);
1382                 exports_evicted++;
1383         }
1384
1385         return exports_evicted;
1386 }
1387 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1388
1389 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1390 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1391 EXPORT_SYMBOL(class_export_dump_hook);
1392 #endif
1393
1394 static void print_export_data(struct obd_export *exp, const char *status,
1395                               int locks)
1396 {
1397         struct ptlrpc_reply_state *rs;
1398         struct ptlrpc_reply_state *first_reply = NULL;
1399         int nreplies = 0;
1400
1401         cfs_spin_lock(&exp->exp_lock);
1402         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1403                                 rs_exp_list) {
1404                 if (nreplies == 0)
1405                         first_reply = rs;
1406                 nreplies++;
1407         }
1408         cfs_spin_unlock(&exp->exp_lock);
1409
1410         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1411                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1412                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1413                cfs_atomic_read(&exp->exp_rpc_count),
1414                cfs_atomic_read(&exp->exp_cb_count),
1415                cfs_atomic_read(&exp->exp_locks_count),
1416                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1417                nreplies, first_reply, nreplies > 3 ? "..." : "",
1418                exp->exp_last_committed);
1419 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1420         if (locks && class_export_dump_hook != NULL)
1421                 class_export_dump_hook(exp);
1422 #endif
1423 }
1424
1425 void dump_exports(struct obd_device *obd, int locks)
1426 {
1427         struct obd_export *exp;
1428
1429         cfs_spin_lock(&obd->obd_dev_lock);
1430         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1431                 print_export_data(exp, "ACTIVE", locks);
1432         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1433                 print_export_data(exp, "UNLINKED", locks);
1434         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1435                 print_export_data(exp, "DELAYED", locks);
1436         cfs_spin_unlock(&obd->obd_dev_lock);
1437         cfs_spin_lock(&obd_zombie_impexp_lock);
1438         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1439                 print_export_data(exp, "ZOMBIE", locks);
1440         cfs_spin_unlock(&obd_zombie_impexp_lock);
1441 }
1442 EXPORT_SYMBOL(dump_exports);
1443
1444 void obd_exports_barrier(struct obd_device *obd)
1445 {
1446         int waited = 2;
1447         LASSERT(cfs_list_empty(&obd->obd_exports));
1448         cfs_spin_lock(&obd->obd_dev_lock);
1449         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1450                 cfs_spin_unlock(&obd->obd_dev_lock);
1451                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1452                                                    cfs_time_seconds(waited));
1453                 if (waited > 5 && IS_PO2(waited)) {
1454                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1455                                       "more than %d seconds. "
1456                                       "The obd refcount = %d. Is it stuck?\n",
1457                                       obd->obd_name, waited,
1458                                       cfs_atomic_read(&obd->obd_refcount));
1459                         dump_exports(obd, 1);
1460                 }
1461                 waited *= 2;
1462                 cfs_spin_lock(&obd->obd_dev_lock);
1463         }
1464         cfs_spin_unlock(&obd->obd_dev_lock);
1465 }
1466 EXPORT_SYMBOL(obd_exports_barrier);
1467
1468 /**
1469  * kill zombie imports and exports
1470  */
1471 void obd_zombie_impexp_cull(void)
1472 {
1473         struct obd_import *import;
1474         struct obd_export *export;
1475         ENTRY;
1476
1477         do {
1478                 cfs_spin_lock(&obd_zombie_impexp_lock);
1479
1480                 import = NULL;
1481                 if (!cfs_list_empty(&obd_zombie_imports)) {
1482                         import = cfs_list_entry(obd_zombie_imports.next,
1483                                                 struct obd_import,
1484                                                 imp_zombie_chain);
1485                         cfs_list_del_init(&import->imp_zombie_chain);
1486                 }
1487
1488                 export = NULL;
1489                 if (!cfs_list_empty(&obd_zombie_exports)) {
1490                         export = cfs_list_entry(obd_zombie_exports.next,
1491                                                 struct obd_export,
1492                                                 exp_obd_chain);
1493                         cfs_list_del_init(&export->exp_obd_chain);
1494                 }
1495
1496                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1497
1498                 if (import != NULL)
1499                         class_import_destroy(import);
1500
1501                 if (export != NULL)
1502                         class_export_destroy(export);
1503
1504                 cfs_cond_resched();
1505         } while (import != NULL || export != NULL);
1506         EXIT;
1507 }
1508
1509 static cfs_completion_t         obd_zombie_start;
1510 static cfs_completion_t         obd_zombie_stop;
1511 static unsigned long            obd_zombie_flags;
1512 static cfs_waitq_t              obd_zombie_waitq;
1513 static pid_t                    obd_zombie_pid;
1514
1515 enum {
1516         OBD_ZOMBIE_STOP   = 1 << 1
1517 };
1518
1519 /**
1520  * check for work for kill zombie import/export thread.
1521  */
1522 static int obd_zombie_impexp_check(void *arg)
1523 {
1524         int rc;
1525
1526         cfs_spin_lock(&obd_zombie_impexp_lock);
1527         rc = cfs_list_empty(&obd_zombie_imports) &&
1528              cfs_list_empty(&obd_zombie_exports) &&
1529              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1530
1531         cfs_spin_unlock(&obd_zombie_impexp_lock);
1532
1533         RETURN(rc);
1534 }
1535
1536 /**
1537  * Add export to the obd_zombe thread and notify it.
1538  */
1539 static void obd_zombie_export_add(struct obd_export *exp) {
1540         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1541         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1542         cfs_list_del_init(&exp->exp_obd_chain);
1543         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1544         cfs_spin_lock(&obd_zombie_impexp_lock);
1545         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1546         cfs_spin_unlock(&obd_zombie_impexp_lock);
1547
1548         if (obd_zombie_impexp_notify != NULL)
1549                 obd_zombie_impexp_notify();
1550 }
1551
1552 /**
1553  * Add import to the obd_zombe thread and notify it.
1554  */
1555 static void obd_zombie_import_add(struct obd_import *imp) {
1556         LASSERT(imp->imp_sec == NULL);
1557         cfs_spin_lock(&obd_zombie_impexp_lock);
1558         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1559         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1560         cfs_spin_unlock(&obd_zombie_impexp_lock);
1561
1562         if (obd_zombie_impexp_notify != NULL)
1563                 obd_zombie_impexp_notify();
1564 }
1565
1566 /**
1567  * notify import/export destroy thread about new zombie.
1568  */
1569 static void obd_zombie_impexp_notify(void)
1570 {
1571         cfs_waitq_signal(&obd_zombie_waitq);
1572 }
1573
1574 /**
1575  * check whether obd_zombie is idle
1576  */
1577 static int obd_zombie_is_idle(void)
1578 {
1579         int rc;
1580
1581         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1582         cfs_spin_lock(&obd_zombie_impexp_lock);
1583         rc = cfs_list_empty(&obd_zombie_imports) &&
1584              cfs_list_empty(&obd_zombie_exports);
1585         cfs_spin_unlock(&obd_zombie_impexp_lock);
1586         return rc;
1587 }
1588
1589 /**
1590  * wait when obd_zombie import/export queues become empty
1591  */
1592 void obd_zombie_barrier(void)
1593 {
1594         struct l_wait_info lwi = { 0 };
1595
1596         if (obd_zombie_pid == cfs_curproc_pid())
1597                 /* don't wait for myself */
1598                 return;
1599         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1600 }
1601 EXPORT_SYMBOL(obd_zombie_barrier);
1602
1603 #ifdef __KERNEL__
1604
1605 /**
1606  * destroy zombie export/import thread.
1607  */
1608 static int obd_zombie_impexp_thread(void *unused)
1609 {
1610         int rc;
1611
1612         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1613                 cfs_complete(&obd_zombie_start);
1614                 RETURN(rc);
1615         }
1616
1617         cfs_complete(&obd_zombie_start);
1618
1619         obd_zombie_pid = cfs_curproc_pid();
1620
1621         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1622                 struct l_wait_info lwi = { 0 };
1623
1624                 l_wait_event(obd_zombie_waitq,
1625                              !obd_zombie_impexp_check(NULL), &lwi);
1626                 obd_zombie_impexp_cull();
1627
1628                 /*
1629                  * Notify obd_zombie_barrier callers that queues
1630                  * may be empty.
1631                  */
1632                 cfs_waitq_signal(&obd_zombie_waitq);
1633         }
1634
1635         cfs_complete(&obd_zombie_stop);
1636
1637         RETURN(0);
1638 }
1639
1640 #else /* ! KERNEL */
1641
1642 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1643 static void *obd_zombie_impexp_work_cb;
1644 static void *obd_zombie_impexp_idle_cb;
1645
1646 int obd_zombie_impexp_kill(void *arg)
1647 {
1648         int rc = 0;
1649
1650         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1651                 obd_zombie_impexp_cull();
1652                 rc = 1;
1653         }
1654         cfs_atomic_dec(&zombie_recur);
1655         return rc;
1656 }
1657
1658 #endif
1659
1660 /**
1661  * start destroy zombie import/export thread
1662  */
1663 int obd_zombie_impexp_init(void)
1664 {
1665         int rc;
1666
1667         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1668         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1669         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1670         cfs_init_completion(&obd_zombie_start);
1671         cfs_init_completion(&obd_zombie_stop);
1672         cfs_waitq_init(&obd_zombie_waitq);
1673         obd_zombie_pid = 0;
1674
1675 #ifdef __KERNEL__
1676         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1677         if (rc < 0)
1678                 RETURN(rc);
1679
1680         cfs_wait_for_completion(&obd_zombie_start);
1681 #else
1682
1683         obd_zombie_impexp_work_cb =
1684                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1685                                                  &obd_zombie_impexp_kill, NULL);
1686
1687         obd_zombie_impexp_idle_cb =
1688                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1689                                                  &obd_zombie_impexp_check, NULL);
1690         rc = 0;
1691 #endif
1692         RETURN(rc);
1693 }
1694 /**
1695  * stop destroy zombie import/export thread
1696  */
1697 void obd_zombie_impexp_stop(void)
1698 {
1699         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1700         obd_zombie_impexp_notify();
1701 #ifdef __KERNEL__
1702         cfs_wait_for_completion(&obd_zombie_stop);
1703 #else
1704         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1705         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1706 #endif
1707 }
1708