Whamcloud - gitweb
LU-884 osc: remove unlikely() marker for checksums
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/obdclass/genops.c
39  *
40  * These are the only exported functions, they provide some generic
41  * infrastructure for managing object devices
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48 #include <obd_ost.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
51
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 cfs_list_t      obd_zombie_imports;
61 cfs_list_t      obd_zombie_exports;
62 cfs_spinlock_t  obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67                               const char *status, int locks);
68
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70
71 /*
72  * support functions: we could use inter-module communication, but this
73  * is more portable to other OS's
74  */
75 static struct obd_device *obd_device_alloc(void)
76 {
77         struct obd_device *obd;
78
79         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80         if (obd != NULL) {
81                 obd->obd_magic = OBD_DEVICE_MAGIC;
82         }
83         return obd;
84 }
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         cfs_list_t *tmp;
103         struct obd_type *type;
104
105         cfs_spin_lock(&obd_types_lock);
106         cfs_list_for_each(tmp, &obd_types) {
107                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         cfs_spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         cfs_spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         cfs_spin_lock(&obd_types_lock);
253         cfs_list_del(&type->typ_chain);
254         cfs_spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         RETURN(0);
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *         pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283         ENTRY;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0]='\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         cfs_write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 RETURN(ERR_PTR(-EOVERFLOW));
341         }
342
343         if (IS_ERR(result)) {
344                 obd_device_free(newdev);
345                 class_put_type(type);
346         } else {
347                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                        result->obd_name, result);
349         }
350         RETURN(result);
351 }
352
353 void class_release_dev(struct obd_device *obd)
354 {
355         struct obd_type *obd_type = obd->obd_type;
356
357         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361         LASSERT(obd_type != NULL);
362
363         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365
366         cfs_write_lock(&obd_dev_lock);
367         obd_devs[obd->obd_minor] = NULL;
368         cfs_write_unlock(&obd_dev_lock);
369         obd_device_free(obd);
370
371         class_put_type(obd_type);
372 }
373
374 int class_name2dev(const char *name)
375 {
376         int i;
377
378         if (!name)
379                 return -1;
380
381         cfs_read_lock(&obd_dev_lock);
382         for (i = 0; i < class_devno_max(); i++) {
383                 struct obd_device *obd = class_num2obd(i);
384
385                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386                         /* Make sure we finished attaching before we give
387                            out any references */
388                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389                         if (obd->obd_attached) {
390                                 cfs_read_unlock(&obd_dev_lock);
391                                 return i;
392                         }
393                         break;
394                 }
395         }
396         cfs_read_unlock(&obd_dev_lock);
397
398         return -1;
399 }
400 EXPORT_SYMBOL(class_name2dev);
401
402 struct obd_device *class_name2obd(const char *name)
403 {
404         int dev = class_name2dev(name);
405
406         if (dev < 0 || dev > class_devno_max())
407                 return NULL;
408         return class_num2obd(dev);
409 }
410 EXPORT_SYMBOL(class_name2obd);
411
412 int class_uuid2dev(struct obd_uuid *uuid)
413 {
414         int i;
415
416         cfs_read_lock(&obd_dev_lock);
417         for (i = 0; i < class_devno_max(); i++) {
418                 struct obd_device *obd = class_num2obd(i);
419
420                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         cfs_read_unlock(&obd_dev_lock);
423                         return i;
424                 }
425         }
426         cfs_read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430 EXPORT_SYMBOL(class_uuid2dev);
431
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 {
434         int dev = class_uuid2dev(uuid);
435         if (dev < 0)
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_uuid2obd);
440
441 /**
442  * Get obd device from ::obd_devs[]
443  *
444  * \param num [in] array index
445  *
446  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447  *         otherwise return the obd device there.
448  */
449 struct obd_device *class_num2obd(int num)
450 {
451         struct obd_device *obd = NULL;
452
453         if (num < class_devno_max()) {
454                 obd = obd_devs[num];
455                 if (obd == NULL)
456                         return NULL;
457
458                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459                          "%p obd_magic %08x != %08x\n",
460                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461                 LASSERTF(obd->obd_minor == num,
462                          "%p obd_minor %0d != %0d\n",
463                          obd, obd->obd_minor, num);
464         }
465
466         return obd;
467 }
468 EXPORT_SYMBOL(class_num2obd);
469
470 void class_obd_list(void)
471 {
472         char *status;
473         int i;
474
475         cfs_read_lock(&obd_dev_lock);
476         for (i = 0; i < class_devno_max(); i++) {
477                 struct obd_device *obd = class_num2obd(i);
478
479                 if (obd == NULL)
480                         continue;
481                 if (obd->obd_stopping)
482                         status = "ST";
483                 else if (obd->obd_set_up)
484                         status = "UP";
485                 else if (obd->obd_attached)
486                         status = "AT";
487                 else
488                         status = "--";
489                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490                          i, status, obd->obd_type->typ_name,
491                          obd->obd_name, obd->obd_uuid.uuid,
492                          cfs_atomic_read(&obd->obd_refcount));
493         }
494         cfs_read_unlock(&obd_dev_lock);
495         return;
496 }
497
498 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
499    specified, then only the client with that uuid is returned,
500    otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502                                           const char * typ_name,
503                                           struct obd_uuid *grp_uuid)
504 {
505         int i;
506
507         cfs_read_lock(&obd_dev_lock);
508         for (i = 0; i < class_devno_max(); i++) {
509                 struct obd_device *obd = class_num2obd(i);
510
511                 if (obd == NULL)
512                         continue;
513                 if ((strncmp(obd->obd_type->typ_name, typ_name,
514                              strlen(typ_name)) == 0)) {
515                         if (obd_uuid_equals(tgt_uuid,
516                                             &obd->u.cli.cl_target_uuid) &&
517                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
518                                                          &obd->obd_uuid) : 1)) {
519                                 cfs_read_unlock(&obd_dev_lock);
520                                 return obd;
521                         }
522                 }
523         }
524         cfs_read_unlock(&obd_dev_lock);
525
526         return NULL;
527 }
528 EXPORT_SYMBOL(class_find_client_obd);
529
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531    searching at *next, and if a device is found, the next index to look
532    at is saved in *next. If next is NULL, then the first matching device
533    will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
535 {
536         int i;
537
538         if (next == NULL)
539                 i = 0;
540         else if (*next >= 0 && *next < class_devno_max())
541                 i = *next;
542         else
543                 return NULL;
544
545         cfs_read_lock(&obd_dev_lock);
546         for (; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552                         if (next != NULL)
553                                 *next = i+1;
554                         cfs_read_unlock(&obd_dev_lock);
555                         return obd;
556                 }
557         }
558         cfs_read_unlock(&obd_dev_lock);
559
560         return NULL;
561 }
562 EXPORT_SYMBOL(class_devices_in_group);
563
564 /**
565  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566  * adjust sptlrpc settings accordingly.
567  */
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 {
570         struct obd_device  *obd;
571         const char         *type;
572         int                 i, rc = 0, rc2;
573
574         LASSERT(namelen > 0);
575
576         cfs_read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 obd = class_num2obd(i);
579
580                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581                         continue;
582
583                 /* only notify mdc, osc, mdt, ost */
584                 type = obd->obd_type->typ_name;
585                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OST_NAME) != 0)
589                         continue;
590
591                 if (strncmp(obd->obd_name, fsname, namelen))
592                         continue;
593
594                 class_incref(obd, __FUNCTION__, obd);
595                 cfs_read_unlock(&obd_dev_lock);
596                 rc2 = obd_set_info_async(obd->obd_self_export,
597                                          sizeof(KEY_SPTLRPC_CONF),
598                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
599                 rc = rc ? rc : rc2;
600                 class_decref(obd, __FUNCTION__, obd);
601                 cfs_read_lock(&obd_dev_lock);
602         }
603         cfs_read_unlock(&obd_dev_lock);
604         return rc;
605 }
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607
608 void obd_cleanup_caches(void)
609 {
610         int rc;
611
612         ENTRY;
613         if (obd_device_cachep) {
614                 rc = cfs_mem_cache_destroy(obd_device_cachep);
615                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616                 obd_device_cachep = NULL;
617         }
618         if (obdo_cachep) {
619                 rc = cfs_mem_cache_destroy(obdo_cachep);
620                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
621                 obdo_cachep = NULL;
622         }
623         if (import_cachep) {
624                 rc = cfs_mem_cache_destroy(import_cachep);
625                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626                 import_cachep = NULL;
627         }
628         if (capa_cachep) {
629                 rc = cfs_mem_cache_destroy(capa_cachep);
630                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
631                 capa_cachep = NULL;
632         }
633         EXIT;
634 }
635
636 int obd_init_caches(void)
637 {
638         ENTRY;
639
640         LASSERT(obd_device_cachep == NULL);
641         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642                                                  sizeof(struct obd_device),
643                                                  0, 0);
644         if (!obd_device_cachep)
645                 GOTO(out, -ENOMEM);
646
647         LASSERT(obdo_cachep == NULL);
648         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
649                                            0, 0);
650         if (!obdo_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(import_cachep == NULL);
654         import_cachep = cfs_mem_cache_create("ll_import_cache",
655                                              sizeof(struct obd_import),
656                                              0, 0);
657         if (!import_cachep)
658                 GOTO(out, -ENOMEM);
659
660         LASSERT(capa_cachep == NULL);
661         capa_cachep = cfs_mem_cache_create("capa_cache",
662                                            sizeof(struct obd_capa), 0, 0);
663         if (!capa_cachep)
664                 GOTO(out, -ENOMEM);
665
666         RETURN(0);
667  out:
668         obd_cleanup_caches();
669         RETURN(-ENOMEM);
670
671 }
672
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 {
676         struct obd_export *export;
677         ENTRY;
678
679         if (!conn) {
680                 CDEBUG(D_CACHE, "looking for null handle\n");
681                 RETURN(NULL);
682         }
683
684         if (conn->cookie == -1) {  /* this means assign a new connection */
685                 CDEBUG(D_CACHE, "want a new connection\n");
686                 RETURN(NULL);
687         }
688
689         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690         export = class_handle2object(conn->cookie);
691         RETURN(export);
692 }
693 EXPORT_SYMBOL(class_conn2export);
694
695 struct obd_device *class_exp2obd(struct obd_export *exp)
696 {
697         if (exp)
698                 return exp->exp_obd;
699         return NULL;
700 }
701 EXPORT_SYMBOL(class_exp2obd);
702
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         export = class_conn2export(conn);
707         if (export) {
708                 struct obd_device *obd = export->exp_obd;
709                 class_export_put(export);
710                 return obd;
711         }
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_conn2obd);
715
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         if (obd == NULL)
720                 return NULL;
721         return obd->u.cli.cl_import;
722 }
723 EXPORT_SYMBOL(class_exp2cliimp);
724
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 {
727         struct obd_device *obd = class_conn2obd(conn);
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_conn2cliimp);
733
734 /* Export management functions */
735 static void class_export_destroy(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738         ENTRY;
739
740         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
741
742         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
743                exp->exp_client_uuid.uuid, obd->obd_name);
744
745         LASSERT(obd != NULL);
746
747         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
748         if (exp->exp_connection)
749                 ptlrpc_put_connection_superhack(exp->exp_connection);
750
751         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
752         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
753         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
754         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
755         obd_destroy_export(exp);
756         class_decref(obd, "export", exp);
757
758         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
759         EXIT;
760 }
761
762 static void export_handle_addref(void *export)
763 {
764         class_export_get(export);
765 }
766
767 struct obd_export *class_export_get(struct obd_export *exp)
768 {
769         cfs_atomic_inc(&exp->exp_refcount);
770         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
771                cfs_atomic_read(&exp->exp_refcount));
772         return exp;
773 }
774 EXPORT_SYMBOL(class_export_get);
775
776 void class_export_put(struct obd_export *exp)
777 {
778         LASSERT(exp != NULL);
779         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
780         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
781                cfs_atomic_read(&exp->exp_refcount) - 1);
782
783         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
784                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
785                 CDEBUG(D_IOCTL, "final put %p/%s\n",
786                        exp, exp->exp_client_uuid.uuid);
787
788                 /* release nid stat refererence */
789                 lprocfs_exp_cleanup(exp);
790
791                 obd_zombie_export_add(exp);
792         }
793 }
794 EXPORT_SYMBOL(class_export_put);
795
796 /* Creates a new export, adds it to the hash table, and returns a
797  * pointer to it. The refcount is 2: one for the hash reference, and
798  * one for the pointer returned by this function. */
799 struct obd_export *class_new_export(struct obd_device *obd,
800                                     struct obd_uuid *cluuid)
801 {
802         struct obd_export *export;
803         cfs_hash_t *hash = NULL;
804         int rc = 0;
805         ENTRY;
806
807         OBD_ALLOC_PTR(export);
808         if (!export)
809                 return ERR_PTR(-ENOMEM);
810
811         export->exp_conn_cnt = 0;
812         export->exp_lock_hash = NULL;
813         cfs_atomic_set(&export->exp_refcount, 2);
814         cfs_atomic_set(&export->exp_rpc_count, 0);
815         cfs_atomic_set(&export->exp_cb_count, 0);
816         cfs_atomic_set(&export->exp_locks_count, 0);
817 #if LUSTRE_TRACKS_LOCK_EXP_REFS
818         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
819         cfs_spin_lock_init(&export->exp_locks_list_guard);
820 #endif
821         cfs_atomic_set(&export->exp_replay_count, 0);
822         export->exp_obd = obd;
823         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
824         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
825         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
826         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
827         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
828         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
829         class_handle_hash(&export->exp_handle, export_handle_addref);
830         export->exp_last_request_time = cfs_time_current_sec();
831         cfs_spin_lock_init(&export->exp_lock);
832         cfs_spin_lock_init(&export->exp_rpc_lock);
833         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
834         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
835         cfs_spin_lock_init(&export->exp_bl_list_lock);
836         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
837
838         export->exp_sp_peer = LUSTRE_SP_ANY;
839         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
840         export->exp_client_uuid = *cluuid;
841         obd_init_export(export);
842
843         cfs_spin_lock(&obd->obd_dev_lock);
844          /* shouldn't happen, but might race */
845         if (obd->obd_stopping)
846                 GOTO(exit_unlock, rc = -ENODEV);
847
848         hash = cfs_hash_getref(obd->obd_uuid_hash);
849         if (hash == NULL)
850                 GOTO(exit_unlock, rc = -ENODEV);
851         cfs_spin_unlock(&obd->obd_dev_lock);
852
853         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
854                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
855                 if (rc != 0) {
856                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
857                                       obd->obd_name, cluuid->uuid, rc);
858                         GOTO(exit_err, rc = -EALREADY);
859                 }
860         }
861
862         cfs_spin_lock(&obd->obd_dev_lock);
863         if (obd->obd_stopping) {
864                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
865                 GOTO(exit_unlock, rc = -ENODEV);
866         }
867
868         class_incref(obd, "export", export);
869         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
870         cfs_list_add_tail(&export->exp_obd_chain_timed,
871                           &export->exp_obd->obd_exports_timed);
872         export->exp_obd->obd_num_exports++;
873         cfs_spin_unlock(&obd->obd_dev_lock);
874         cfs_hash_putref(hash);
875         RETURN(export);
876
877 exit_unlock:
878         cfs_spin_unlock(&obd->obd_dev_lock);
879 exit_err:
880         if (hash)
881                 cfs_hash_putref(hash);
882         class_handle_unhash(&export->exp_handle);
883         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
884         obd_destroy_export(export);
885         OBD_FREE_PTR(export);
886         return ERR_PTR(rc);
887 }
888 EXPORT_SYMBOL(class_new_export);
889
890 void class_unlink_export(struct obd_export *exp)
891 {
892         class_handle_unhash(&exp->exp_handle);
893
894         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
895         /* delete an uuid-export hashitem from hashtables */
896         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
897                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
898                              &exp->exp_client_uuid,
899                              &exp->exp_uuid_hash);
900
901         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
902         cfs_list_del_init(&exp->exp_obd_chain_timed);
903         exp->exp_obd->obd_num_exports--;
904         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
905         class_export_put(exp);
906 }
907 EXPORT_SYMBOL(class_unlink_export);
908
909 /* Import management functions */
910 void class_import_destroy(struct obd_import *imp)
911 {
912         ENTRY;
913
914         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
915                 imp->imp_obd->obd_name);
916
917         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
918
919         ptlrpc_put_connection_superhack(imp->imp_connection);
920
921         while (!cfs_list_empty(&imp->imp_conn_list)) {
922                 struct obd_import_conn *imp_conn;
923
924                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
925                                           struct obd_import_conn, oic_item);
926                 cfs_list_del_init(&imp_conn->oic_item);
927                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
928                 OBD_FREE(imp_conn, sizeof(*imp_conn));
929         }
930
931         LASSERT(imp->imp_sec == NULL);
932         class_decref(imp->imp_obd, "import", imp);
933         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
934         EXIT;
935 }
936
937 static void import_handle_addref(void *import)
938 {
939         class_import_get(import);
940 }
941
942 struct obd_import *class_import_get(struct obd_import *import)
943 {
944         cfs_atomic_inc(&import->imp_refcount);
945         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
946                cfs_atomic_read(&import->imp_refcount),
947                import->imp_obd->obd_name);
948         return import;
949 }
950 EXPORT_SYMBOL(class_import_get);
951
952 void class_import_put(struct obd_import *imp)
953 {
954         ENTRY;
955
956         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
957         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
958
959         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
960                cfs_atomic_read(&imp->imp_refcount) - 1,
961                imp->imp_obd->obd_name);
962
963         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
964                 CDEBUG(D_INFO, "final put import %p\n", imp);
965                 obd_zombie_import_add(imp);
966         }
967
968         EXIT;
969 }
970 EXPORT_SYMBOL(class_import_put);
971
972 static void init_imp_at(struct imp_at *at) {
973         int i;
974         at_init(&at->iat_net_latency, 0, 0);
975         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
976                 /* max service estimates are tracked on the server side, so
977                    don't use the AT history here, just use the last reported
978                    val. (But keep hist for proc histogram, worst_ever) */
979                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
980                         AT_FLG_NOHIST);
981         }
982 }
983
984 struct obd_import *class_new_import(struct obd_device *obd)
985 {
986         struct obd_import *imp;
987
988         OBD_ALLOC(imp, sizeof(*imp));
989         if (imp == NULL)
990                 return NULL;
991
992         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
993         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
994         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
995         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
996         cfs_spin_lock_init(&imp->imp_lock);
997         imp->imp_last_success_conn = 0;
998         imp->imp_state = LUSTRE_IMP_NEW;
999         imp->imp_obd = class_incref(obd, "import", imp);
1000         cfs_mutex_init(&imp->imp_sec_mutex);
1001         cfs_waitq_init(&imp->imp_recovery_waitq);
1002
1003         cfs_atomic_set(&imp->imp_refcount, 2);
1004         cfs_atomic_set(&imp->imp_unregistering, 0);
1005         cfs_atomic_set(&imp->imp_inflight, 0);
1006         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1007         cfs_atomic_set(&imp->imp_inval_count, 0);
1008         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1009         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1010         class_handle_hash(&imp->imp_handle, import_handle_addref);
1011         init_imp_at(&imp->imp_at);
1012
1013         /* the default magic is V2, will be used in connect RPC, and
1014          * then adjusted according to the flags in request/reply. */
1015         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1016
1017         return imp;
1018 }
1019 EXPORT_SYMBOL(class_new_import);
1020
1021 void class_destroy_import(struct obd_import *import)
1022 {
1023         LASSERT(import != NULL);
1024         LASSERT(import != LP_POISON);
1025
1026         class_handle_unhash(&import->imp_handle);
1027
1028         cfs_spin_lock(&import->imp_lock);
1029         import->imp_generation++;
1030         cfs_spin_unlock(&import->imp_lock);
1031         class_import_put(import);
1032 }
1033 EXPORT_SYMBOL(class_destroy_import);
1034
1035 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1036
1037 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1038 {
1039         cfs_spin_lock(&exp->exp_locks_list_guard);
1040
1041         LASSERT(lock->l_exp_refs_nr >= 0);
1042
1043         if (lock->l_exp_refs_target != NULL &&
1044             lock->l_exp_refs_target != exp) {
1045                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1046                               exp, lock, lock->l_exp_refs_target);
1047         }
1048         if ((lock->l_exp_refs_nr ++) == 0) {
1049                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1050                 lock->l_exp_refs_target = exp;
1051         }
1052         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1053                lock, exp, lock->l_exp_refs_nr);
1054         cfs_spin_unlock(&exp->exp_locks_list_guard);
1055 }
1056 EXPORT_SYMBOL(__class_export_add_lock_ref);
1057
1058 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1059 {
1060         cfs_spin_lock(&exp->exp_locks_list_guard);
1061         LASSERT(lock->l_exp_refs_nr > 0);
1062         if (lock->l_exp_refs_target != exp) {
1063                 LCONSOLE_WARN("lock %p, "
1064                               "mismatching export pointers: %p, %p\n",
1065                               lock, lock->l_exp_refs_target, exp);
1066         }
1067         if (-- lock->l_exp_refs_nr == 0) {
1068                 cfs_list_del_init(&lock->l_exp_refs_link);
1069                 lock->l_exp_refs_target = NULL;
1070         }
1071         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1072                lock, exp, lock->l_exp_refs_nr);
1073         cfs_spin_unlock(&exp->exp_locks_list_guard);
1074 }
1075 EXPORT_SYMBOL(__class_export_del_lock_ref);
1076 #endif
1077
1078 /* A connection defines an export context in which preallocation can
1079    be managed. This releases the export pointer reference, and returns
1080    the export handle, so the export refcount is 1 when this function
1081    returns. */
1082 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1083                   struct obd_uuid *cluuid)
1084 {
1085         struct obd_export *export;
1086         LASSERT(conn != NULL);
1087         LASSERT(obd != NULL);
1088         LASSERT(cluuid != NULL);
1089         ENTRY;
1090
1091         export = class_new_export(obd, cluuid);
1092         if (IS_ERR(export))
1093                 RETURN(PTR_ERR(export));
1094
1095         conn->cookie = export->exp_handle.h_cookie;
1096         class_export_put(export);
1097
1098         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1099                cluuid->uuid, conn->cookie);
1100         RETURN(0);
1101 }
1102 EXPORT_SYMBOL(class_connect);
1103
1104 /* if export is involved in recovery then clean up related things */
1105 void class_export_recovery_cleanup(struct obd_export *exp)
1106 {
1107         struct obd_device *obd = exp->exp_obd;
1108
1109         cfs_spin_lock(&obd->obd_recovery_task_lock);
1110         if (exp->exp_delayed)
1111                 obd->obd_delayed_clients--;
1112         if (obd->obd_recovering && exp->exp_in_recovery) {
1113                 cfs_spin_lock(&exp->exp_lock);
1114                 exp->exp_in_recovery = 0;
1115                 cfs_spin_unlock(&exp->exp_lock);
1116                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1117                 cfs_atomic_dec(&obd->obd_connected_clients);
1118         }
1119         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1120         /** Cleanup req replay fields */
1121         if (exp->exp_req_replay_needed) {
1122                 cfs_spin_lock(&exp->exp_lock);
1123                 exp->exp_req_replay_needed = 0;
1124                 cfs_spin_unlock(&exp->exp_lock);
1125                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1126                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1127         }
1128         /** Cleanup lock replay data */
1129         if (exp->exp_lock_replay_needed) {
1130                 cfs_spin_lock(&exp->exp_lock);
1131                 exp->exp_lock_replay_needed = 0;
1132                 cfs_spin_unlock(&exp->exp_lock);
1133                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1134                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1135         }
1136 }
1137
1138 /* This function removes 1-3 references from the export:
1139  * 1 - for export pointer passed
1140  * and if disconnect really need
1141  * 2 - removing from hash
1142  * 3 - in client_unlink_export
1143  * The export pointer passed to this function can destroyed */
1144 int class_disconnect(struct obd_export *export)
1145 {
1146         int already_disconnected;
1147         ENTRY;
1148
1149         if (export == NULL) {
1150                 CWARN("attempting to free NULL export %p\n", export);
1151                 RETURN(-EINVAL);
1152         }
1153
1154         cfs_spin_lock(&export->exp_lock);
1155         already_disconnected = export->exp_disconnected;
1156         export->exp_disconnected = 1;
1157         cfs_spin_unlock(&export->exp_lock);
1158
1159         /* class_cleanup(), abort_recovery(), and class_fail_export()
1160          * all end up in here, and if any of them race we shouldn't
1161          * call extra class_export_puts(). */
1162         if (already_disconnected) {
1163                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1164                 GOTO(no_disconn, already_disconnected);
1165         }
1166
1167         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1168                export->exp_handle.h_cookie);
1169
1170         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1171                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1172                              &export->exp_connection->c_peer.nid,
1173                              &export->exp_nid_hash);
1174
1175         class_export_recovery_cleanup(export);
1176         class_unlink_export(export);
1177 no_disconn:
1178         class_export_put(export);
1179         RETURN(0);
1180 }
1181 EXPORT_SYMBOL(class_disconnect);
1182
1183 /* Return non-zero for a fully connected export */
1184 int class_connected_export(struct obd_export *exp)
1185 {
1186         if (exp) {
1187                 int connected;
1188                 cfs_spin_lock(&exp->exp_lock);
1189                 connected = (exp->exp_conn_cnt > 0);
1190                 cfs_spin_unlock(&exp->exp_lock);
1191                 return connected;
1192         }
1193         return 0;
1194 }
1195 EXPORT_SYMBOL(class_connected_export);
1196
1197 static void class_disconnect_export_list(cfs_list_t *list,
1198                                          enum obd_option flags)
1199 {
1200         int rc;
1201         struct obd_export *exp;
1202         ENTRY;
1203
1204         /* It's possible that an export may disconnect itself, but
1205          * nothing else will be added to this list. */
1206         while (!cfs_list_empty(list)) {
1207                 exp = cfs_list_entry(list->next, struct obd_export,
1208                                      exp_obd_chain);
1209                 /* need for safe call CDEBUG after obd_disconnect */
1210                 class_export_get(exp);
1211
1212                 cfs_spin_lock(&exp->exp_lock);
1213                 exp->exp_flags = flags;
1214                 cfs_spin_unlock(&exp->exp_lock);
1215
1216                 if (obd_uuid_equals(&exp->exp_client_uuid,
1217                                     &exp->exp_obd->obd_uuid)) {
1218                         CDEBUG(D_HA,
1219                                "exp %p export uuid == obd uuid, don't discon\n",
1220                                exp);
1221                         /* Need to delete this now so we don't end up pointing
1222                          * to work_list later when this export is cleaned up. */
1223                         cfs_list_del_init(&exp->exp_obd_chain);
1224                         class_export_put(exp);
1225                         continue;
1226                 }
1227
1228                 class_export_get(exp);
1229                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1230                        "last request at "CFS_TIME_T"\n",
1231                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1232                        exp, exp->exp_last_request_time);
1233                 /* release one export reference anyway */
1234                 rc = obd_disconnect(exp);
1235
1236                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1237                        obd_export_nid2str(exp), exp, rc);
1238                 class_export_put(exp);
1239         }
1240         EXIT;
1241 }
1242
1243 void class_disconnect_exports(struct obd_device *obd)
1244 {
1245         cfs_list_t work_list;
1246         ENTRY;
1247
1248         /* Move all of the exports from obd_exports to a work list, en masse. */
1249         CFS_INIT_LIST_HEAD(&work_list);
1250         cfs_spin_lock(&obd->obd_dev_lock);
1251         cfs_list_splice_init(&obd->obd_exports, &work_list);
1252         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1253         cfs_spin_unlock(&obd->obd_dev_lock);
1254
1255         if (!cfs_list_empty(&work_list)) {
1256                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1257                        "disconnecting them\n", obd->obd_minor, obd);
1258                 class_disconnect_export_list(&work_list,
1259                                              exp_flags_from_obd(obd));
1260         } else
1261                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1262                        obd->obd_minor, obd);
1263         EXIT;
1264 }
1265 EXPORT_SYMBOL(class_disconnect_exports);
1266
1267 /* Remove exports that have not completed recovery.
1268  */
1269 void class_disconnect_stale_exports(struct obd_device *obd,
1270                                     int (*test_export)(struct obd_export *))
1271 {
1272         cfs_list_t work_list;
1273         cfs_list_t *pos, *n;
1274         struct obd_export *exp;
1275         int evicted = 0;
1276         ENTRY;
1277
1278         CFS_INIT_LIST_HEAD(&work_list);
1279         cfs_spin_lock(&obd->obd_dev_lock);
1280         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1281                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1282                 if (test_export(exp))
1283                         continue;
1284
1285                 /* don't count self-export as client */
1286                 if (obd_uuid_equals(&exp->exp_client_uuid,
1287                                     &exp->exp_obd->obd_uuid))
1288                         continue;
1289
1290                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1291                 evicted++;
1292                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1293                        obd->obd_name, exp->exp_client_uuid.uuid,
1294                        exp->exp_connection == NULL ? "<unknown>" :
1295                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1296                 print_export_data(exp, "EVICTING", 0);
1297         }
1298         cfs_spin_unlock(&obd->obd_dev_lock);
1299
1300         if (evicted) {
1301                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1302                        obd->obd_name, evicted);
1303                 obd->obd_stale_clients += evicted;
1304         }
1305         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1306                                                  OBD_OPT_ABORT_RECOV);
1307         EXIT;
1308 }
1309 EXPORT_SYMBOL(class_disconnect_stale_exports);
1310
1311 void class_fail_export(struct obd_export *exp)
1312 {
1313         int rc, already_failed;
1314
1315         cfs_spin_lock(&exp->exp_lock);
1316         already_failed = exp->exp_failed;
1317         exp->exp_failed = 1;
1318         cfs_spin_unlock(&exp->exp_lock);
1319
1320         if (already_failed) {
1321                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1322                        exp, exp->exp_client_uuid.uuid);
1323                 return;
1324         }
1325
1326         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1327                exp, exp->exp_client_uuid.uuid);
1328
1329         if (obd_dump_on_timeout)
1330                 libcfs_debug_dumplog();
1331
1332         /* Most callers into obd_disconnect are removing their own reference
1333          * (request, for example) in addition to the one from the hash table.
1334          * We don't have such a reference here, so make one. */
1335         class_export_get(exp);
1336         rc = obd_disconnect(exp);
1337         if (rc)
1338                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1339         else
1340                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1341                        exp, exp->exp_client_uuid.uuid);
1342 }
1343 EXPORT_SYMBOL(class_fail_export);
1344
1345 char *obd_export_nid2str(struct obd_export *exp)
1346 {
1347         if (exp->exp_connection != NULL)
1348                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1349
1350         return "(no nid)";
1351 }
1352 EXPORT_SYMBOL(obd_export_nid2str);
1353
1354 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1355 {
1356         struct obd_export *doomed_exp = NULL;
1357         int exports_evicted = 0;
1358
1359         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1360
1361         do {
1362                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1363                 if (doomed_exp == NULL)
1364                         break;
1365
1366                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1367                          "nid %s found, wanted nid %s, requested nid %s\n",
1368                          obd_export_nid2str(doomed_exp),
1369                          libcfs_nid2str(nid_key), nid);
1370                 LASSERTF(doomed_exp != obd->obd_self_export,
1371                          "self-export is hashed by NID?\n");
1372                 exports_evicted++;
1373                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1374                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1375                        exports_evicted);
1376                 class_fail_export(doomed_exp);
1377                 class_export_put(doomed_exp);
1378         } while (1);
1379
1380         if (!exports_evicted)
1381                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1382                        obd->obd_name, nid);
1383         return exports_evicted;
1384 }
1385 EXPORT_SYMBOL(obd_export_evict_by_nid);
1386
1387 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1388 {
1389         struct obd_export *doomed_exp = NULL;
1390         struct obd_uuid doomed_uuid;
1391         int exports_evicted = 0;
1392
1393         obd_str2uuid(&doomed_uuid, uuid);
1394         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1395                 CERROR("%s: can't evict myself\n", obd->obd_name);
1396                 return exports_evicted;
1397         }
1398
1399         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1400
1401         if (doomed_exp == NULL) {
1402                 CERROR("%s: can't disconnect %s: no exports found\n",
1403                        obd->obd_name, uuid);
1404         } else {
1405                 CWARN("%s: evicting %s at adminstrative request\n",
1406                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1407                 class_fail_export(doomed_exp);
1408                 class_export_put(doomed_exp);
1409                 exports_evicted++;
1410         }
1411
1412         return exports_evicted;
1413 }
1414 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1415
1416 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1417 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1418 EXPORT_SYMBOL(class_export_dump_hook);
1419 #endif
1420
1421 static void print_export_data(struct obd_export *exp, const char *status,
1422                               int locks)
1423 {
1424         struct ptlrpc_reply_state *rs;
1425         struct ptlrpc_reply_state *first_reply = NULL;
1426         int nreplies = 0;
1427
1428         cfs_spin_lock(&exp->exp_lock);
1429         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1430                                 rs_exp_list) {
1431                 if (nreplies == 0)
1432                         first_reply = rs;
1433                 nreplies++;
1434         }
1435         cfs_spin_unlock(&exp->exp_lock);
1436
1437         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1438                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1439                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1440                cfs_atomic_read(&exp->exp_rpc_count),
1441                cfs_atomic_read(&exp->exp_cb_count),
1442                cfs_atomic_read(&exp->exp_locks_count),
1443                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1444                nreplies, first_reply, nreplies > 3 ? "..." : "",
1445                exp->exp_last_committed);
1446 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1447         if (locks && class_export_dump_hook != NULL)
1448                 class_export_dump_hook(exp);
1449 #endif
1450 }
1451
1452 void dump_exports(struct obd_device *obd, int locks)
1453 {
1454         struct obd_export *exp;
1455
1456         cfs_spin_lock(&obd->obd_dev_lock);
1457         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1458                 print_export_data(exp, "ACTIVE", locks);
1459         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1460                 print_export_data(exp, "UNLINKED", locks);
1461         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1462                 print_export_data(exp, "DELAYED", locks);
1463         cfs_spin_unlock(&obd->obd_dev_lock);
1464         cfs_spin_lock(&obd_zombie_impexp_lock);
1465         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1466                 print_export_data(exp, "ZOMBIE", locks);
1467         cfs_spin_unlock(&obd_zombie_impexp_lock);
1468 }
1469 EXPORT_SYMBOL(dump_exports);
1470
1471 void obd_exports_barrier(struct obd_device *obd)
1472 {
1473         int waited = 2;
1474         LASSERT(cfs_list_empty(&obd->obd_exports));
1475         cfs_spin_lock(&obd->obd_dev_lock);
1476         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1477                 cfs_spin_unlock(&obd->obd_dev_lock);
1478                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1479                                                    cfs_time_seconds(waited));
1480                 if (waited > 5 && IS_PO2(waited)) {
1481                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1482                                       "more than %d seconds. "
1483                                       "The obd refcount = %d. Is it stuck?\n",
1484                                       obd->obd_name, waited,
1485                                       cfs_atomic_read(&obd->obd_refcount));
1486                         dump_exports(obd, 1);
1487                 }
1488                 waited *= 2;
1489                 cfs_spin_lock(&obd->obd_dev_lock);
1490         }
1491         cfs_spin_unlock(&obd->obd_dev_lock);
1492 }
1493 EXPORT_SYMBOL(obd_exports_barrier);
1494
1495 /* Total amount of zombies to be destroyed */
1496 static int zombies_count = 0;
1497
1498 /**
1499  * kill zombie imports and exports
1500  */
1501 void obd_zombie_impexp_cull(void)
1502 {
1503         struct obd_import *import;
1504         struct obd_export *export;
1505         ENTRY;
1506
1507         do {
1508                 cfs_spin_lock(&obd_zombie_impexp_lock);
1509
1510                 import = NULL;
1511                 if (!cfs_list_empty(&obd_zombie_imports)) {
1512                         import = cfs_list_entry(obd_zombie_imports.next,
1513                                                 struct obd_import,
1514                                                 imp_zombie_chain);
1515                         cfs_list_del_init(&import->imp_zombie_chain);
1516                 }
1517
1518                 export = NULL;
1519                 if (!cfs_list_empty(&obd_zombie_exports)) {
1520                         export = cfs_list_entry(obd_zombie_exports.next,
1521                                                 struct obd_export,
1522                                                 exp_obd_chain);
1523                         cfs_list_del_init(&export->exp_obd_chain);
1524                 }
1525
1526                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1527
1528                 if (import != NULL) {
1529                         class_import_destroy(import);
1530                         cfs_spin_lock(&obd_zombie_impexp_lock);
1531                         zombies_count--;
1532                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1533                 }
1534
1535                 if (export != NULL) {
1536                         class_export_destroy(export);
1537                         cfs_spin_lock(&obd_zombie_impexp_lock);
1538                         zombies_count--;
1539                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1540                 }
1541
1542                 cfs_cond_resched();
1543         } while (import != NULL || export != NULL);
1544         EXIT;
1545 }
1546
1547 static cfs_completion_t         obd_zombie_start;
1548 static cfs_completion_t         obd_zombie_stop;
1549 static unsigned long            obd_zombie_flags;
1550 static cfs_waitq_t              obd_zombie_waitq;
1551 static pid_t                    obd_zombie_pid;
1552
1553 enum {
1554         OBD_ZOMBIE_STOP   = 1 << 1
1555 };
1556
1557 /**
1558  * check for work for kill zombie import/export thread.
1559  */
1560 static int obd_zombie_impexp_check(void *arg)
1561 {
1562         int rc;
1563
1564         cfs_spin_lock(&obd_zombie_impexp_lock);
1565         rc = (zombies_count == 0) &&
1566              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1567         cfs_spin_unlock(&obd_zombie_impexp_lock);
1568
1569         RETURN(rc);
1570 }
1571
1572 /**
1573  * Add export to the obd_zombe thread and notify it.
1574  */
1575 static void obd_zombie_export_add(struct obd_export *exp) {
1576         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1577         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1578         cfs_list_del_init(&exp->exp_obd_chain);
1579         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1580         cfs_spin_lock(&obd_zombie_impexp_lock);
1581         zombies_count++;
1582         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1583         cfs_spin_unlock(&obd_zombie_impexp_lock);
1584
1585         obd_zombie_impexp_notify();
1586 }
1587
1588 /**
1589  * Add import to the obd_zombe thread and notify it.
1590  */
1591 static void obd_zombie_import_add(struct obd_import *imp) {
1592         LASSERT(imp->imp_sec == NULL);
1593         cfs_spin_lock(&obd_zombie_impexp_lock);
1594         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1595         zombies_count++;
1596         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1597         cfs_spin_unlock(&obd_zombie_impexp_lock);
1598
1599         obd_zombie_impexp_notify();
1600 }
1601
1602 /**
1603  * notify import/export destroy thread about new zombie.
1604  */
1605 static void obd_zombie_impexp_notify(void)
1606 {
1607         /*
1608          * Make sure obd_zomebie_impexp_thread get this notification.
1609          * It is possible this signal only get by obd_zombie_barrier, and
1610          * barrier gulps this notification and sleeps away and hangs ensues
1611          */
1612         cfs_waitq_broadcast(&obd_zombie_waitq);
1613 }
1614
1615 /**
1616  * check whether obd_zombie is idle
1617  */
1618 static int obd_zombie_is_idle(void)
1619 {
1620         int rc;
1621
1622         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1623         cfs_spin_lock(&obd_zombie_impexp_lock);
1624         rc = (zombies_count == 0);
1625         cfs_spin_unlock(&obd_zombie_impexp_lock);
1626         return rc;
1627 }
1628
1629 /**
1630  * wait when obd_zombie import/export queues become empty
1631  */
1632 void obd_zombie_barrier(void)
1633 {
1634         struct l_wait_info lwi = { 0 };
1635
1636         if (obd_zombie_pid == cfs_curproc_pid())
1637                 /* don't wait for myself */
1638                 return;
1639         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1640 }
1641 EXPORT_SYMBOL(obd_zombie_barrier);
1642
1643 #ifdef __KERNEL__
1644
1645 /**
1646  * destroy zombie export/import thread.
1647  */
1648 static int obd_zombie_impexp_thread(void *unused)
1649 {
1650         int rc;
1651
1652         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1653                 cfs_complete(&obd_zombie_start);
1654                 RETURN(rc);
1655         }
1656
1657         cfs_complete(&obd_zombie_start);
1658
1659         obd_zombie_pid = cfs_curproc_pid();
1660
1661         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1662                 struct l_wait_info lwi = { 0 };
1663
1664                 l_wait_event(obd_zombie_waitq,
1665                              !obd_zombie_impexp_check(NULL), &lwi);
1666                 obd_zombie_impexp_cull();
1667
1668                 /*
1669                  * Notify obd_zombie_barrier callers that queues
1670                  * may be empty.
1671                  */
1672                 cfs_waitq_signal(&obd_zombie_waitq);
1673         }
1674
1675         cfs_complete(&obd_zombie_stop);
1676
1677         RETURN(0);
1678 }
1679
1680 #else /* ! KERNEL */
1681
1682 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1683 static void *obd_zombie_impexp_work_cb;
1684 static void *obd_zombie_impexp_idle_cb;
1685
1686 int obd_zombie_impexp_kill(void *arg)
1687 {
1688         int rc = 0;
1689
1690         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1691                 obd_zombie_impexp_cull();
1692                 rc = 1;
1693         }
1694         cfs_atomic_dec(&zombie_recur);
1695         return rc;
1696 }
1697
1698 #endif
1699
1700 /**
1701  * start destroy zombie import/export thread
1702  */
1703 int obd_zombie_impexp_init(void)
1704 {
1705         int rc;
1706
1707         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1708         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1709         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1710         cfs_init_completion(&obd_zombie_start);
1711         cfs_init_completion(&obd_zombie_stop);
1712         cfs_waitq_init(&obd_zombie_waitq);
1713         obd_zombie_pid = 0;
1714
1715 #ifdef __KERNEL__
1716         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1717         if (rc < 0)
1718                 RETURN(rc);
1719
1720         cfs_wait_for_completion(&obd_zombie_start);
1721 #else
1722
1723         obd_zombie_impexp_work_cb =
1724                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1725                                                  &obd_zombie_impexp_kill, NULL);
1726
1727         obd_zombie_impexp_idle_cb =
1728                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1729                                                  &obd_zombie_impexp_check, NULL);
1730         rc = 0;
1731 #endif
1732         RETURN(rc);
1733 }
1734 /**
1735  * stop destroy zombie import/export thread
1736  */
1737 void obd_zombie_impexp_stop(void)
1738 {
1739         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1740         obd_zombie_impexp_notify();
1741 #ifdef __KERNEL__
1742         cfs_wait_for_completion(&obd_zombie_stop);
1743 #else
1744         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1745         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1746 #endif
1747 }
1748
1749 /***** Kernel-userspace comm helpers *******/
1750
1751 /* Get length of entire message, including header */
1752 int kuc_len(int payload_len)
1753 {
1754         return sizeof(struct kuc_hdr) + payload_len;
1755 }
1756 EXPORT_SYMBOL(kuc_len);
1757
1758 /* Get a pointer to kuc header, given a ptr to the payload
1759  * @param p Pointer to payload area
1760  * @returns Pointer to kuc header
1761  */
1762 struct kuc_hdr * kuc_ptr(void *p)
1763 {
1764         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1765         LASSERT(lh->kuc_magic == KUC_MAGIC);
1766         return lh;
1767 }
1768 EXPORT_SYMBOL(kuc_ptr);
1769
1770 /* Test if payload is part of kuc message
1771  * @param p Pointer to payload area
1772  * @returns boolean
1773  */
1774 int kuc_ispayload(void *p)
1775 {
1776         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1777
1778         if (kh->kuc_magic == KUC_MAGIC)
1779                 return 1;
1780         else
1781                 return 0;
1782 }
1783 EXPORT_SYMBOL(kuc_ispayload);
1784
1785 /* Alloc space for a message, and fill in header
1786  * @return Pointer to payload area
1787  */
1788 void *kuc_alloc(int payload_len, int transport, int type)
1789 {
1790         struct kuc_hdr *lh;
1791         int len = kuc_len(payload_len);
1792
1793         OBD_ALLOC(lh, len);
1794         if (lh == NULL)
1795                 return ERR_PTR(-ENOMEM);
1796
1797         lh->kuc_magic = KUC_MAGIC;
1798         lh->kuc_transport = transport;
1799         lh->kuc_msgtype = type;
1800         lh->kuc_msglen = len;
1801
1802         return (void *)(lh + 1);
1803 }
1804 EXPORT_SYMBOL(kuc_alloc);
1805
1806 /* Takes pointer to payload area */
1807 inline void kuc_free(void *p, int payload_len)
1808 {
1809         struct kuc_hdr *lh = kuc_ptr(p);
1810         OBD_FREE(lh, kuc_len(payload_len));
1811 }
1812 EXPORT_SYMBOL(kuc_free);
1813
1814
1815