Whamcloud - gitweb
LU-1789 protocol: add support for lightweight connection
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         cfs_spin_lock(&obd_types_lock);
253         cfs_list_del(&type->typ_chain);
254         cfs_spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         RETURN(0);
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *         pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283         ENTRY;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0]='\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         cfs_write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 RETURN(ERR_PTR(-EOVERFLOW));
341         }
342
343         if (IS_ERR(result)) {
344                 obd_device_free(newdev);
345                 class_put_type(type);
346         } else {
347                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                        result->obd_name, result);
349         }
350         RETURN(result);
351 }
352
353 void class_release_dev(struct obd_device *obd)
354 {
355         struct obd_type *obd_type = obd->obd_type;
356
357         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361         LASSERT(obd_type != NULL);
362
363         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365
366         cfs_write_lock(&obd_dev_lock);
367         obd_devs[obd->obd_minor] = NULL;
368         cfs_write_unlock(&obd_dev_lock);
369         obd_device_free(obd);
370
371         class_put_type(obd_type);
372 }
373
374 int class_name2dev(const char *name)
375 {
376         int i;
377
378         if (!name)
379                 return -1;
380
381         cfs_read_lock(&obd_dev_lock);
382         for (i = 0; i < class_devno_max(); i++) {
383                 struct obd_device *obd = class_num2obd(i);
384
385                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386                         /* Make sure we finished attaching before we give
387                            out any references */
388                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389                         if (obd->obd_attached) {
390                                 cfs_read_unlock(&obd_dev_lock);
391                                 return i;
392                         }
393                         break;
394                 }
395         }
396         cfs_read_unlock(&obd_dev_lock);
397
398         return -1;
399 }
400 EXPORT_SYMBOL(class_name2dev);
401
402 struct obd_device *class_name2obd(const char *name)
403 {
404         int dev = class_name2dev(name);
405
406         if (dev < 0 || dev > class_devno_max())
407                 return NULL;
408         return class_num2obd(dev);
409 }
410 EXPORT_SYMBOL(class_name2obd);
411
412 int class_uuid2dev(struct obd_uuid *uuid)
413 {
414         int i;
415
416         cfs_read_lock(&obd_dev_lock);
417         for (i = 0; i < class_devno_max(); i++) {
418                 struct obd_device *obd = class_num2obd(i);
419
420                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         cfs_read_unlock(&obd_dev_lock);
423                         return i;
424                 }
425         }
426         cfs_read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430 EXPORT_SYMBOL(class_uuid2dev);
431
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 {
434         int dev = class_uuid2dev(uuid);
435         if (dev < 0)
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_uuid2obd);
440
441 /**
442  * Get obd device from ::obd_devs[]
443  *
444  * \param num [in] array index
445  *
446  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447  *         otherwise return the obd device there.
448  */
449 struct obd_device *class_num2obd(int num)
450 {
451         struct obd_device *obd = NULL;
452
453         if (num < class_devno_max()) {
454                 obd = obd_devs[num];
455                 if (obd == NULL)
456                         return NULL;
457
458                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459                          "%p obd_magic %08x != %08x\n",
460                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461                 LASSERTF(obd->obd_minor == num,
462                          "%p obd_minor %0d != %0d\n",
463                          obd, obd->obd_minor, num);
464         }
465
466         return obd;
467 }
468 EXPORT_SYMBOL(class_num2obd);
469
470 void class_obd_list(void)
471 {
472         char *status;
473         int i;
474
475         cfs_read_lock(&obd_dev_lock);
476         for (i = 0; i < class_devno_max(); i++) {
477                 struct obd_device *obd = class_num2obd(i);
478
479                 if (obd == NULL)
480                         continue;
481                 if (obd->obd_stopping)
482                         status = "ST";
483                 else if (obd->obd_set_up)
484                         status = "UP";
485                 else if (obd->obd_attached)
486                         status = "AT";
487                 else
488                         status = "--";
489                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490                          i, status, obd->obd_type->typ_name,
491                          obd->obd_name, obd->obd_uuid.uuid,
492                          cfs_atomic_read(&obd->obd_refcount));
493         }
494         cfs_read_unlock(&obd_dev_lock);
495         return;
496 }
497
498 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
499    specified, then only the client with that uuid is returned,
500    otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502                                           const char * typ_name,
503                                           struct obd_uuid *grp_uuid)
504 {
505         int i;
506
507         cfs_read_lock(&obd_dev_lock);
508         for (i = 0; i < class_devno_max(); i++) {
509                 struct obd_device *obd = class_num2obd(i);
510
511                 if (obd == NULL)
512                         continue;
513                 if ((strncmp(obd->obd_type->typ_name, typ_name,
514                              strlen(typ_name)) == 0)) {
515                         if (obd_uuid_equals(tgt_uuid,
516                                             &obd->u.cli.cl_target_uuid) &&
517                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
518                                                          &obd->obd_uuid) : 1)) {
519                                 cfs_read_unlock(&obd_dev_lock);
520                                 return obd;
521                         }
522                 }
523         }
524         cfs_read_unlock(&obd_dev_lock);
525
526         return NULL;
527 }
528 EXPORT_SYMBOL(class_find_client_obd);
529
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531    searching at *next, and if a device is found, the next index to look
532    at is saved in *next. If next is NULL, then the first matching device
533    will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
535 {
536         int i;
537
538         if (next == NULL)
539                 i = 0;
540         else if (*next >= 0 && *next < class_devno_max())
541                 i = *next;
542         else
543                 return NULL;
544
545         cfs_read_lock(&obd_dev_lock);
546         for (; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552                         if (next != NULL)
553                                 *next = i+1;
554                         cfs_read_unlock(&obd_dev_lock);
555                         return obd;
556                 }
557         }
558         cfs_read_unlock(&obd_dev_lock);
559
560         return NULL;
561 }
562 EXPORT_SYMBOL(class_devices_in_group);
563
564 /**
565  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566  * adjust sptlrpc settings accordingly.
567  */
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 {
570         struct obd_device  *obd;
571         const char         *type;
572         int                 i, rc = 0, rc2;
573
574         LASSERT(namelen > 0);
575
576         cfs_read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 obd = class_num2obd(i);
579
580                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581                         continue;
582
583                 /* only notify mdc, osc, mdt, ost */
584                 type = obd->obd_type->typ_name;
585                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OST_NAME) != 0)
589                         continue;
590
591                 if (strncmp(obd->obd_name, fsname, namelen))
592                         continue;
593
594                 class_incref(obd, __FUNCTION__, obd);
595                 cfs_read_unlock(&obd_dev_lock);
596                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
597                                          sizeof(KEY_SPTLRPC_CONF),
598                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
599                 rc = rc ? rc : rc2;
600                 class_decref(obd, __FUNCTION__, obd);
601                 cfs_read_lock(&obd_dev_lock);
602         }
603         cfs_read_unlock(&obd_dev_lock);
604         return rc;
605 }
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607
608 void obd_cleanup_caches(void)
609 {
610         int rc;
611
612         ENTRY;
613         if (obd_device_cachep) {
614                 rc = cfs_mem_cache_destroy(obd_device_cachep);
615                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616                 obd_device_cachep = NULL;
617         }
618         if (obdo_cachep) {
619                 rc = cfs_mem_cache_destroy(obdo_cachep);
620                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
621                 obdo_cachep = NULL;
622         }
623         if (import_cachep) {
624                 rc = cfs_mem_cache_destroy(import_cachep);
625                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626                 import_cachep = NULL;
627         }
628         if (capa_cachep) {
629                 rc = cfs_mem_cache_destroy(capa_cachep);
630                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
631                 capa_cachep = NULL;
632         }
633         EXIT;
634 }
635
636 int obd_init_caches(void)
637 {
638         ENTRY;
639
640         LASSERT(obd_device_cachep == NULL);
641         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642                                                  sizeof(struct obd_device),
643                                                  0, 0);
644         if (!obd_device_cachep)
645                 GOTO(out, -ENOMEM);
646
647         LASSERT(obdo_cachep == NULL);
648         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
649                                            0, 0);
650         if (!obdo_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(import_cachep == NULL);
654         import_cachep = cfs_mem_cache_create("ll_import_cache",
655                                              sizeof(struct obd_import),
656                                              0, 0);
657         if (!import_cachep)
658                 GOTO(out, -ENOMEM);
659
660         LASSERT(capa_cachep == NULL);
661         capa_cachep = cfs_mem_cache_create("capa_cache",
662                                            sizeof(struct obd_capa), 0, 0);
663         if (!capa_cachep)
664                 GOTO(out, -ENOMEM);
665
666         RETURN(0);
667  out:
668         obd_cleanup_caches();
669         RETURN(-ENOMEM);
670
671 }
672
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 {
676         struct obd_export *export;
677         ENTRY;
678
679         if (!conn) {
680                 CDEBUG(D_CACHE, "looking for null handle\n");
681                 RETURN(NULL);
682         }
683
684         if (conn->cookie == -1) {  /* this means assign a new connection */
685                 CDEBUG(D_CACHE, "want a new connection\n");
686                 RETURN(NULL);
687         }
688
689         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690         export = class_handle2object(conn->cookie);
691         RETURN(export);
692 }
693 EXPORT_SYMBOL(class_conn2export);
694
695 struct obd_device *class_exp2obd(struct obd_export *exp)
696 {
697         if (exp)
698                 return exp->exp_obd;
699         return NULL;
700 }
701 EXPORT_SYMBOL(class_exp2obd);
702
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         export = class_conn2export(conn);
707         if (export) {
708                 struct obd_device *obd = export->exp_obd;
709                 class_export_put(export);
710                 return obd;
711         }
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_conn2obd);
715
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         if (obd == NULL)
720                 return NULL;
721         return obd->u.cli.cl_import;
722 }
723 EXPORT_SYMBOL(class_exp2cliimp);
724
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 {
727         struct obd_device *obd = class_conn2obd(conn);
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_conn2cliimp);
733
734 /* Export management functions */
735 static void class_export_destroy(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738         ENTRY;
739
740         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
741         LASSERT(obd != NULL);
742
743         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744                exp->exp_client_uuid.uuid, obd->obd_name);
745
746         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747         if (exp->exp_connection)
748                 ptlrpc_put_connection_superhack(exp->exp_connection);
749
750         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
754         obd_destroy_export(exp);
755         class_decref(obd, "export", exp);
756
757         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
758         EXIT;
759 }
760
761 static void export_handle_addref(void *export)
762 {
763         class_export_get(export);
764 }
765
766 static struct portals_handle_ops export_handle_ops = {
767         .hop_addref = export_handle_addref,
768         .hop_free   = NULL,
769 };
770
771 struct obd_export *class_export_get(struct obd_export *exp)
772 {
773         cfs_atomic_inc(&exp->exp_refcount);
774         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
775                cfs_atomic_read(&exp->exp_refcount));
776         return exp;
777 }
778 EXPORT_SYMBOL(class_export_get);
779
780 void class_export_put(struct obd_export *exp)
781 {
782         LASSERT(exp != NULL);
783         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
784         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
785                cfs_atomic_read(&exp->exp_refcount) - 1);
786
787         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
788                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
789                 CDEBUG(D_IOCTL, "final put %p/%s\n",
790                        exp, exp->exp_client_uuid.uuid);
791
792                 /* release nid stat refererence */
793                 lprocfs_exp_cleanup(exp);
794
795                 obd_zombie_export_add(exp);
796         }
797 }
798 EXPORT_SYMBOL(class_export_put);
799
800 /* Creates a new export, adds it to the hash table, and returns a
801  * pointer to it. The refcount is 2: one for the hash reference, and
802  * one for the pointer returned by this function. */
803 struct obd_export *class_new_export(struct obd_device *obd,
804                                     struct obd_uuid *cluuid)
805 {
806         struct obd_export *export;
807         cfs_hash_t *hash = NULL;
808         int rc = 0;
809         ENTRY;
810
811         OBD_ALLOC_PTR(export);
812         if (!export)
813                 return ERR_PTR(-ENOMEM);
814
815         export->exp_conn_cnt = 0;
816         export->exp_lock_hash = NULL;
817         export->exp_flock_hash = NULL;
818         cfs_atomic_set(&export->exp_refcount, 2);
819         cfs_atomic_set(&export->exp_rpc_count, 0);
820         cfs_atomic_set(&export->exp_cb_count, 0);
821         cfs_atomic_set(&export->exp_locks_count, 0);
822 #if LUSTRE_TRACKS_LOCK_EXP_REFS
823         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
824         cfs_spin_lock_init(&export->exp_locks_list_guard);
825 #endif
826         cfs_atomic_set(&export->exp_replay_count, 0);
827         export->exp_obd = obd;
828         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
829         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
830         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
831         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
832         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
833         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
834         class_handle_hash(&export->exp_handle, &export_handle_ops);
835         export->exp_last_request_time = cfs_time_current_sec();
836         cfs_spin_lock_init(&export->exp_lock);
837         cfs_spin_lock_init(&export->exp_rpc_lock);
838         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
839         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
840         cfs_spin_lock_init(&export->exp_bl_list_lock);
841         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
842
843         export->exp_sp_peer = LUSTRE_SP_ANY;
844         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
845         export->exp_client_uuid = *cluuid;
846         obd_init_export(export);
847
848         cfs_spin_lock(&obd->obd_dev_lock);
849          /* shouldn't happen, but might race */
850         if (obd->obd_stopping)
851                 GOTO(exit_unlock, rc = -ENODEV);
852
853         hash = cfs_hash_getref(obd->obd_uuid_hash);
854         if (hash == NULL)
855                 GOTO(exit_unlock, rc = -ENODEV);
856         cfs_spin_unlock(&obd->obd_dev_lock);
857
858         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
859                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
860                 if (rc != 0) {
861                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
862                                       obd->obd_name, cluuid->uuid, rc);
863                         GOTO(exit_err, rc = -EALREADY);
864                 }
865         }
866
867         cfs_spin_lock(&obd->obd_dev_lock);
868         if (obd->obd_stopping) {
869                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
870                 GOTO(exit_unlock, rc = -ENODEV);
871         }
872
873         class_incref(obd, "export", export);
874         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
875         cfs_list_add_tail(&export->exp_obd_chain_timed,
876                           &export->exp_obd->obd_exports_timed);
877         export->exp_obd->obd_num_exports++;
878         cfs_spin_unlock(&obd->obd_dev_lock);
879         cfs_hash_putref(hash);
880         RETURN(export);
881
882 exit_unlock:
883         cfs_spin_unlock(&obd->obd_dev_lock);
884 exit_err:
885         if (hash)
886                 cfs_hash_putref(hash);
887         class_handle_unhash(&export->exp_handle);
888         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
889         obd_destroy_export(export);
890         OBD_FREE_PTR(export);
891         return ERR_PTR(rc);
892 }
893 EXPORT_SYMBOL(class_new_export);
894
895 void class_unlink_export(struct obd_export *exp)
896 {
897         class_handle_unhash(&exp->exp_handle);
898
899         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
900         /* delete an uuid-export hashitem from hashtables */
901         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
902                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
903                              &exp->exp_client_uuid,
904                              &exp->exp_uuid_hash);
905
906         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
907         cfs_list_del_init(&exp->exp_obd_chain_timed);
908         exp->exp_obd->obd_num_exports--;
909         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
910         class_export_put(exp);
911 }
912 EXPORT_SYMBOL(class_unlink_export);
913
914 /* Import management functions */
915 void class_import_destroy(struct obd_import *imp)
916 {
917         ENTRY;
918
919         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
920                 imp->imp_obd->obd_name);
921
922         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
923
924         ptlrpc_put_connection_superhack(imp->imp_connection);
925
926         while (!cfs_list_empty(&imp->imp_conn_list)) {
927                 struct obd_import_conn *imp_conn;
928
929                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
930                                           struct obd_import_conn, oic_item);
931                 cfs_list_del_init(&imp_conn->oic_item);
932                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
933                 OBD_FREE(imp_conn, sizeof(*imp_conn));
934         }
935
936         LASSERT(imp->imp_sec == NULL);
937         class_decref(imp->imp_obd, "import", imp);
938         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
939         EXIT;
940 }
941
942 static void import_handle_addref(void *import)
943 {
944         class_import_get(import);
945 }
946
947 static struct portals_handle_ops import_handle_ops = {
948         .hop_addref = import_handle_addref,
949         .hop_free   = NULL,
950 };
951
952 struct obd_import *class_import_get(struct obd_import *import)
953 {
954         cfs_atomic_inc(&import->imp_refcount);
955         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
956                cfs_atomic_read(&import->imp_refcount),
957                import->imp_obd->obd_name);
958         return import;
959 }
960 EXPORT_SYMBOL(class_import_get);
961
962 void class_import_put(struct obd_import *imp)
963 {
964         ENTRY;
965
966         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
967         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
968
969         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
970                cfs_atomic_read(&imp->imp_refcount) - 1,
971                imp->imp_obd->obd_name);
972
973         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
974                 CDEBUG(D_INFO, "final put import %p\n", imp);
975                 obd_zombie_import_add(imp);
976         }
977
978         /* catch possible import put race */
979         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
980         EXIT;
981 }
982 EXPORT_SYMBOL(class_import_put);
983
984 static void init_imp_at(struct imp_at *at) {
985         int i;
986         at_init(&at->iat_net_latency, 0, 0);
987         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
988                 /* max service estimates are tracked on the server side, so
989                    don't use the AT history here, just use the last reported
990                    val. (But keep hist for proc histogram, worst_ever) */
991                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
992                         AT_FLG_NOHIST);
993         }
994 }
995
996 struct obd_import *class_new_import(struct obd_device *obd)
997 {
998         struct obd_import *imp;
999
1000         OBD_ALLOC(imp, sizeof(*imp));
1001         if (imp == NULL)
1002                 return NULL;
1003
1004         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1005         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1006         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1007         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1008         cfs_spin_lock_init(&imp->imp_lock);
1009         imp->imp_last_success_conn = 0;
1010         imp->imp_state = LUSTRE_IMP_NEW;
1011         imp->imp_obd = class_incref(obd, "import", imp);
1012         cfs_mutex_init(&imp->imp_sec_mutex);
1013         cfs_waitq_init(&imp->imp_recovery_waitq);
1014
1015         cfs_atomic_set(&imp->imp_refcount, 2);
1016         cfs_atomic_set(&imp->imp_unregistering, 0);
1017         cfs_atomic_set(&imp->imp_inflight, 0);
1018         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1019         cfs_atomic_set(&imp->imp_inval_count, 0);
1020         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1021         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1022         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1023         init_imp_at(&imp->imp_at);
1024
1025         /* the default magic is V2, will be used in connect RPC, and
1026          * then adjusted according to the flags in request/reply. */
1027         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1028
1029         return imp;
1030 }
1031 EXPORT_SYMBOL(class_new_import);
1032
1033 void class_destroy_import(struct obd_import *import)
1034 {
1035         LASSERT(import != NULL);
1036         LASSERT(import != LP_POISON);
1037
1038         class_handle_unhash(&import->imp_handle);
1039
1040         cfs_spin_lock(&import->imp_lock);
1041         import->imp_generation++;
1042         cfs_spin_unlock(&import->imp_lock);
1043         class_import_put(import);
1044 }
1045 EXPORT_SYMBOL(class_destroy_import);
1046
1047 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1048
1049 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1050 {
1051         cfs_spin_lock(&exp->exp_locks_list_guard);
1052
1053         LASSERT(lock->l_exp_refs_nr >= 0);
1054
1055         if (lock->l_exp_refs_target != NULL &&
1056             lock->l_exp_refs_target != exp) {
1057                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1058                               exp, lock, lock->l_exp_refs_target);
1059         }
1060         if ((lock->l_exp_refs_nr ++) == 0) {
1061                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1062                 lock->l_exp_refs_target = exp;
1063         }
1064         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1065                lock, exp, lock->l_exp_refs_nr);
1066         cfs_spin_unlock(&exp->exp_locks_list_guard);
1067 }
1068 EXPORT_SYMBOL(__class_export_add_lock_ref);
1069
1070 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1071 {
1072         cfs_spin_lock(&exp->exp_locks_list_guard);
1073         LASSERT(lock->l_exp_refs_nr > 0);
1074         if (lock->l_exp_refs_target != exp) {
1075                 LCONSOLE_WARN("lock %p, "
1076                               "mismatching export pointers: %p, %p\n",
1077                               lock, lock->l_exp_refs_target, exp);
1078         }
1079         if (-- lock->l_exp_refs_nr == 0) {
1080                 cfs_list_del_init(&lock->l_exp_refs_link);
1081                 lock->l_exp_refs_target = NULL;
1082         }
1083         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1084                lock, exp, lock->l_exp_refs_nr);
1085         cfs_spin_unlock(&exp->exp_locks_list_guard);
1086 }
1087 EXPORT_SYMBOL(__class_export_del_lock_ref);
1088 #endif
1089
1090 /* A connection defines an export context in which preallocation can
1091    be managed. This releases the export pointer reference, and returns
1092    the export handle, so the export refcount is 1 when this function
1093    returns. */
1094 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1095                   struct obd_uuid *cluuid)
1096 {
1097         struct obd_export *export;
1098         LASSERT(conn != NULL);
1099         LASSERT(obd != NULL);
1100         LASSERT(cluuid != NULL);
1101         ENTRY;
1102
1103         export = class_new_export(obd, cluuid);
1104         if (IS_ERR(export))
1105                 RETURN(PTR_ERR(export));
1106
1107         conn->cookie = export->exp_handle.h_cookie;
1108         class_export_put(export);
1109
1110         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1111                cluuid->uuid, conn->cookie);
1112         RETURN(0);
1113 }
1114 EXPORT_SYMBOL(class_connect);
1115
1116 /* if export is involved in recovery then clean up related things */
1117 void class_export_recovery_cleanup(struct obd_export *exp)
1118 {
1119         struct obd_device *obd = exp->exp_obd;
1120
1121         cfs_spin_lock(&obd->obd_recovery_task_lock);
1122         if (exp->exp_delayed)
1123                 obd->obd_delayed_clients--;
1124         if (obd->obd_recovering && exp->exp_in_recovery) {
1125                 cfs_spin_lock(&exp->exp_lock);
1126                 exp->exp_in_recovery = 0;
1127                 cfs_spin_unlock(&exp->exp_lock);
1128                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1129                 cfs_atomic_dec(&obd->obd_connected_clients);
1130         }
1131         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1132         /** Cleanup req replay fields */
1133         if (exp->exp_req_replay_needed) {
1134                 cfs_spin_lock(&exp->exp_lock);
1135                 exp->exp_req_replay_needed = 0;
1136                 cfs_spin_unlock(&exp->exp_lock);
1137                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1138                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1139         }
1140         /** Cleanup lock replay data */
1141         if (exp->exp_lock_replay_needed) {
1142                 cfs_spin_lock(&exp->exp_lock);
1143                 exp->exp_lock_replay_needed = 0;
1144                 cfs_spin_unlock(&exp->exp_lock);
1145                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1146                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1147         }
1148 }
1149
1150 /* This function removes 1-3 references from the export:
1151  * 1 - for export pointer passed
1152  * and if disconnect really need
1153  * 2 - removing from hash
1154  * 3 - in client_unlink_export
1155  * The export pointer passed to this function can destroyed */
1156 int class_disconnect(struct obd_export *export)
1157 {
1158         int already_disconnected;
1159         ENTRY;
1160
1161         if (export == NULL) {
1162                 CWARN("attempting to free NULL export %p\n", export);
1163                 RETURN(-EINVAL);
1164         }
1165
1166         cfs_spin_lock(&export->exp_lock);
1167         already_disconnected = export->exp_disconnected;
1168         export->exp_disconnected = 1;
1169         cfs_spin_unlock(&export->exp_lock);
1170
1171         /* class_cleanup(), abort_recovery(), and class_fail_export()
1172          * all end up in here, and if any of them race we shouldn't
1173          * call extra class_export_puts(). */
1174         if (already_disconnected) {
1175                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1176                 GOTO(no_disconn, already_disconnected);
1177         }
1178
1179         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1180                export->exp_handle.h_cookie);
1181
1182         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1183                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1184                              &export->exp_connection->c_peer.nid,
1185                              &export->exp_nid_hash);
1186
1187         class_export_recovery_cleanup(export);
1188         class_unlink_export(export);
1189 no_disconn:
1190         class_export_put(export);
1191         RETURN(0);
1192 }
1193 EXPORT_SYMBOL(class_disconnect);
1194
1195 /* Return non-zero for a fully connected export */
1196 int class_connected_export(struct obd_export *exp)
1197 {
1198         if (exp) {
1199                 int connected;
1200                 cfs_spin_lock(&exp->exp_lock);
1201                 connected = (exp->exp_conn_cnt > 0);
1202                 cfs_spin_unlock(&exp->exp_lock);
1203                 return connected;
1204         }
1205         return 0;
1206 }
1207 EXPORT_SYMBOL(class_connected_export);
1208
1209 static void class_disconnect_export_list(cfs_list_t *list,
1210                                          enum obd_option flags)
1211 {
1212         int rc;
1213         struct obd_export *exp;
1214         ENTRY;
1215
1216         /* It's possible that an export may disconnect itself, but
1217          * nothing else will be added to this list. */
1218         while (!cfs_list_empty(list)) {
1219                 exp = cfs_list_entry(list->next, struct obd_export,
1220                                      exp_obd_chain);
1221                 /* need for safe call CDEBUG after obd_disconnect */
1222                 class_export_get(exp);
1223
1224                 cfs_spin_lock(&exp->exp_lock);
1225                 exp->exp_flags = flags;
1226                 cfs_spin_unlock(&exp->exp_lock);
1227
1228                 if (obd_uuid_equals(&exp->exp_client_uuid,
1229                                     &exp->exp_obd->obd_uuid)) {
1230                         CDEBUG(D_HA,
1231                                "exp %p export uuid == obd uuid, don't discon\n",
1232                                exp);
1233                         /* Need to delete this now so we don't end up pointing
1234                          * to work_list later when this export is cleaned up. */
1235                         cfs_list_del_init(&exp->exp_obd_chain);
1236                         class_export_put(exp);
1237                         continue;
1238                 }
1239
1240                 class_export_get(exp);
1241                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1242                        "last request at "CFS_TIME_T"\n",
1243                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1244                        exp, exp->exp_last_request_time);
1245                 /* release one export reference anyway */
1246                 rc = obd_disconnect(exp);
1247
1248                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1249                        obd_export_nid2str(exp), exp, rc);
1250                 class_export_put(exp);
1251         }
1252         EXIT;
1253 }
1254
1255 void class_disconnect_exports(struct obd_device *obd)
1256 {
1257         cfs_list_t work_list;
1258         ENTRY;
1259
1260         /* Move all of the exports from obd_exports to a work list, en masse. */
1261         CFS_INIT_LIST_HEAD(&work_list);
1262         cfs_spin_lock(&obd->obd_dev_lock);
1263         cfs_list_splice_init(&obd->obd_exports, &work_list);
1264         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1265         cfs_spin_unlock(&obd->obd_dev_lock);
1266
1267         if (!cfs_list_empty(&work_list)) {
1268                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1269                        "disconnecting them\n", obd->obd_minor, obd);
1270                 class_disconnect_export_list(&work_list,
1271                                              exp_flags_from_obd(obd));
1272         } else
1273                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1274                        obd->obd_minor, obd);
1275         EXIT;
1276 }
1277 EXPORT_SYMBOL(class_disconnect_exports);
1278
1279 /* Remove exports that have not completed recovery.
1280  */
1281 void class_disconnect_stale_exports(struct obd_device *obd,
1282                                     int (*test_export)(struct obd_export *))
1283 {
1284         cfs_list_t work_list;
1285         struct obd_export *exp, *n;
1286         int evicted = 0;
1287         ENTRY;
1288
1289         CFS_INIT_LIST_HEAD(&work_list);
1290         cfs_spin_lock(&obd->obd_dev_lock);
1291         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1292                                      exp_obd_chain) {
1293                 /* don't count self-export as client */
1294                 if (obd_uuid_equals(&exp->exp_client_uuid,
1295                                     &exp->exp_obd->obd_uuid))
1296                         continue;
1297
1298                 /* don't evict clients which have no slot in last_rcvd
1299                  * (e.g. lightweight connection) */
1300                 if (exp->exp_target_data.ted_lr_idx == -1)
1301                         continue;
1302
1303                 cfs_spin_lock(&exp->exp_lock);
1304                 if (test_export(exp)) {
1305                         cfs_spin_unlock(&exp->exp_lock);
1306                         continue;
1307                 }
1308                 exp->exp_failed = 1;
1309                 cfs_spin_unlock(&exp->exp_lock);
1310
1311                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1312                 evicted++;
1313                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1314                        obd->obd_name, exp->exp_client_uuid.uuid,
1315                        exp->exp_connection == NULL ? "<unknown>" :
1316                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1317                 print_export_data(exp, "EVICTING", 0);
1318         }
1319         cfs_spin_unlock(&obd->obd_dev_lock);
1320
1321         if (evicted) {
1322                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1323                               obd->obd_name, evicted);
1324                 obd->obd_stale_clients += evicted;
1325         }
1326         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1327                                                  OBD_OPT_ABORT_RECOV);
1328         EXIT;
1329 }
1330 EXPORT_SYMBOL(class_disconnect_stale_exports);
1331
1332 void class_fail_export(struct obd_export *exp)
1333 {
1334         int rc, already_failed;
1335
1336         cfs_spin_lock(&exp->exp_lock);
1337         already_failed = exp->exp_failed;
1338         exp->exp_failed = 1;
1339         cfs_spin_unlock(&exp->exp_lock);
1340
1341         if (already_failed) {
1342                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1343                        exp, exp->exp_client_uuid.uuid);
1344                 return;
1345         }
1346
1347         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1348                exp, exp->exp_client_uuid.uuid);
1349
1350         if (obd_dump_on_timeout)
1351                 libcfs_debug_dumplog();
1352
1353         /* need for safe call CDEBUG after obd_disconnect */
1354         class_export_get(exp);
1355
1356         /* Most callers into obd_disconnect are removing their own reference
1357          * (request, for example) in addition to the one from the hash table.
1358          * We don't have such a reference here, so make one. */
1359         class_export_get(exp);
1360         rc = obd_disconnect(exp);
1361         if (rc)
1362                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1363         else
1364                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1365                        exp, exp->exp_client_uuid.uuid);
1366         class_export_put(exp);
1367 }
1368 EXPORT_SYMBOL(class_fail_export);
1369
1370 char *obd_export_nid2str(struct obd_export *exp)
1371 {
1372         if (exp->exp_connection != NULL)
1373                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1374
1375         return "(no nid)";
1376 }
1377 EXPORT_SYMBOL(obd_export_nid2str);
1378
1379 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1380 {
1381         struct obd_export *doomed_exp = NULL;
1382         int exports_evicted = 0;
1383
1384         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1385
1386         do {
1387                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1388                 if (doomed_exp == NULL)
1389                         break;
1390
1391                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1392                          "nid %s found, wanted nid %s, requested nid %s\n",
1393                          obd_export_nid2str(doomed_exp),
1394                          libcfs_nid2str(nid_key), nid);
1395                 LASSERTF(doomed_exp != obd->obd_self_export,
1396                          "self-export is hashed by NID?\n");
1397                 exports_evicted++;
1398                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1399                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1400                        exports_evicted);
1401                 class_fail_export(doomed_exp);
1402                 class_export_put(doomed_exp);
1403         } while (1);
1404
1405         if (!exports_evicted)
1406                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1407                        obd->obd_name, nid);
1408         return exports_evicted;
1409 }
1410 EXPORT_SYMBOL(obd_export_evict_by_nid);
1411
1412 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1413 {
1414         struct obd_export *doomed_exp = NULL;
1415         struct obd_uuid doomed_uuid;
1416         int exports_evicted = 0;
1417
1418         obd_str2uuid(&doomed_uuid, uuid);
1419         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1420                 CERROR("%s: can't evict myself\n", obd->obd_name);
1421                 return exports_evicted;
1422         }
1423
1424         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1425
1426         if (doomed_exp == NULL) {
1427                 CERROR("%s: can't disconnect %s: no exports found\n",
1428                        obd->obd_name, uuid);
1429         } else {
1430                 CWARN("%s: evicting %s at adminstrative request\n",
1431                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1432                 class_fail_export(doomed_exp);
1433                 class_export_put(doomed_exp);
1434                 exports_evicted++;
1435         }
1436
1437         return exports_evicted;
1438 }
1439 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1440
1441 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1442 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1443 EXPORT_SYMBOL(class_export_dump_hook);
1444 #endif
1445
1446 static void print_export_data(struct obd_export *exp, const char *status,
1447                               int locks)
1448 {
1449         struct ptlrpc_reply_state *rs;
1450         struct ptlrpc_reply_state *first_reply = NULL;
1451         int nreplies = 0;
1452
1453         cfs_spin_lock(&exp->exp_lock);
1454         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1455                                 rs_exp_list) {
1456                 if (nreplies == 0)
1457                         first_reply = rs;
1458                 nreplies++;
1459         }
1460         cfs_spin_unlock(&exp->exp_lock);
1461
1462         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1463                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1464                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1465                cfs_atomic_read(&exp->exp_rpc_count),
1466                cfs_atomic_read(&exp->exp_cb_count),
1467                cfs_atomic_read(&exp->exp_locks_count),
1468                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1469                nreplies, first_reply, nreplies > 3 ? "..." : "",
1470                exp->exp_last_committed);
1471 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1472         if (locks && class_export_dump_hook != NULL)
1473                 class_export_dump_hook(exp);
1474 #endif
1475 }
1476
1477 void dump_exports(struct obd_device *obd, int locks)
1478 {
1479         struct obd_export *exp;
1480
1481         cfs_spin_lock(&obd->obd_dev_lock);
1482         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1483                 print_export_data(exp, "ACTIVE", locks);
1484         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1485                 print_export_data(exp, "UNLINKED", locks);
1486         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1487                 print_export_data(exp, "DELAYED", locks);
1488         cfs_spin_unlock(&obd->obd_dev_lock);
1489         cfs_spin_lock(&obd_zombie_impexp_lock);
1490         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1491                 print_export_data(exp, "ZOMBIE", locks);
1492         cfs_spin_unlock(&obd_zombie_impexp_lock);
1493 }
1494 EXPORT_SYMBOL(dump_exports);
1495
1496 void obd_exports_barrier(struct obd_device *obd)
1497 {
1498         int waited = 2;
1499         LASSERT(cfs_list_empty(&obd->obd_exports));
1500         cfs_spin_lock(&obd->obd_dev_lock);
1501         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1502                 cfs_spin_unlock(&obd->obd_dev_lock);
1503                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1504                                                    cfs_time_seconds(waited));
1505                 if (waited > 5 && IS_PO2(waited)) {
1506                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1507                                       "more than %d seconds. "
1508                                       "The obd refcount = %d. Is it stuck?\n",
1509                                       obd->obd_name, waited,
1510                                       cfs_atomic_read(&obd->obd_refcount));
1511                         dump_exports(obd, 1);
1512                 }
1513                 waited *= 2;
1514                 cfs_spin_lock(&obd->obd_dev_lock);
1515         }
1516         cfs_spin_unlock(&obd->obd_dev_lock);
1517 }
1518 EXPORT_SYMBOL(obd_exports_barrier);
1519
1520 /* Total amount of zombies to be destroyed */
1521 static int zombies_count = 0;
1522
1523 /**
1524  * kill zombie imports and exports
1525  */
1526 void obd_zombie_impexp_cull(void)
1527 {
1528         struct obd_import *import;
1529         struct obd_export *export;
1530         ENTRY;
1531
1532         do {
1533                 cfs_spin_lock(&obd_zombie_impexp_lock);
1534
1535                 import = NULL;
1536                 if (!cfs_list_empty(&obd_zombie_imports)) {
1537                         import = cfs_list_entry(obd_zombie_imports.next,
1538                                                 struct obd_import,
1539                                                 imp_zombie_chain);
1540                         cfs_list_del_init(&import->imp_zombie_chain);
1541                 }
1542
1543                 export = NULL;
1544                 if (!cfs_list_empty(&obd_zombie_exports)) {
1545                         export = cfs_list_entry(obd_zombie_exports.next,
1546                                                 struct obd_export,
1547                                                 exp_obd_chain);
1548                         cfs_list_del_init(&export->exp_obd_chain);
1549                 }
1550
1551                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1552
1553                 if (import != NULL) {
1554                         class_import_destroy(import);
1555                         cfs_spin_lock(&obd_zombie_impexp_lock);
1556                         zombies_count--;
1557                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1558                 }
1559
1560                 if (export != NULL) {
1561                         class_export_destroy(export);
1562                         cfs_spin_lock(&obd_zombie_impexp_lock);
1563                         zombies_count--;
1564                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1565                 }
1566
1567                 cfs_cond_resched();
1568         } while (import != NULL || export != NULL);
1569         EXIT;
1570 }
1571
1572 static cfs_completion_t         obd_zombie_start;
1573 static cfs_completion_t         obd_zombie_stop;
1574 static unsigned long            obd_zombie_flags;
1575 static cfs_waitq_t              obd_zombie_waitq;
1576 static pid_t                    obd_zombie_pid;
1577
1578 enum {
1579         OBD_ZOMBIE_STOP   = 1 << 1
1580 };
1581
1582 /**
1583  * check for work for kill zombie import/export thread.
1584  */
1585 static int obd_zombie_impexp_check(void *arg)
1586 {
1587         int rc;
1588
1589         cfs_spin_lock(&obd_zombie_impexp_lock);
1590         rc = (zombies_count == 0) &&
1591              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1592         cfs_spin_unlock(&obd_zombie_impexp_lock);
1593
1594         RETURN(rc);
1595 }
1596
1597 /**
1598  * Add export to the obd_zombe thread and notify it.
1599  */
1600 static void obd_zombie_export_add(struct obd_export *exp) {
1601         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1602         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1603         cfs_list_del_init(&exp->exp_obd_chain);
1604         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1605         cfs_spin_lock(&obd_zombie_impexp_lock);
1606         zombies_count++;
1607         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1608         cfs_spin_unlock(&obd_zombie_impexp_lock);
1609
1610         obd_zombie_impexp_notify();
1611 }
1612
1613 /**
1614  * Add import to the obd_zombe thread and notify it.
1615  */
1616 static void obd_zombie_import_add(struct obd_import *imp) {
1617         LASSERT(imp->imp_sec == NULL);
1618         LASSERT(imp->imp_rq_pool == NULL);
1619         cfs_spin_lock(&obd_zombie_impexp_lock);
1620         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1621         zombies_count++;
1622         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1623         cfs_spin_unlock(&obd_zombie_impexp_lock);
1624
1625         obd_zombie_impexp_notify();
1626 }
1627
1628 /**
1629  * notify import/export destroy thread about new zombie.
1630  */
1631 static void obd_zombie_impexp_notify(void)
1632 {
1633         /*
1634          * Make sure obd_zomebie_impexp_thread get this notification.
1635          * It is possible this signal only get by obd_zombie_barrier, and
1636          * barrier gulps this notification and sleeps away and hangs ensues
1637          */
1638         cfs_waitq_broadcast(&obd_zombie_waitq);
1639 }
1640
1641 /**
1642  * check whether obd_zombie is idle
1643  */
1644 static int obd_zombie_is_idle(void)
1645 {
1646         int rc;
1647
1648         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1649         cfs_spin_lock(&obd_zombie_impexp_lock);
1650         rc = (zombies_count == 0);
1651         cfs_spin_unlock(&obd_zombie_impexp_lock);
1652         return rc;
1653 }
1654
1655 /**
1656  * wait when obd_zombie import/export queues become empty
1657  */
1658 void obd_zombie_barrier(void)
1659 {
1660         struct l_wait_info lwi = { 0 };
1661
1662         if (obd_zombie_pid == cfs_curproc_pid())
1663                 /* don't wait for myself */
1664                 return;
1665         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1666 }
1667 EXPORT_SYMBOL(obd_zombie_barrier);
1668
1669 #ifdef __KERNEL__
1670
1671 /**
1672  * destroy zombie export/import thread.
1673  */
1674 static int obd_zombie_impexp_thread(void *unused)
1675 {
1676         int rc;
1677
1678         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1679                 cfs_complete(&obd_zombie_start);
1680                 RETURN(rc);
1681         }
1682
1683         cfs_complete(&obd_zombie_start);
1684
1685         obd_zombie_pid = cfs_curproc_pid();
1686
1687         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1688                 struct l_wait_info lwi = { 0 };
1689
1690                 l_wait_event(obd_zombie_waitq,
1691                              !obd_zombie_impexp_check(NULL), &lwi);
1692                 obd_zombie_impexp_cull();
1693
1694                 /*
1695                  * Notify obd_zombie_barrier callers that queues
1696                  * may be empty.
1697                  */
1698                 cfs_waitq_signal(&obd_zombie_waitq);
1699         }
1700
1701         cfs_complete(&obd_zombie_stop);
1702
1703         RETURN(0);
1704 }
1705
1706 #else /* ! KERNEL */
1707
1708 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1709 static void *obd_zombie_impexp_work_cb;
1710 static void *obd_zombie_impexp_idle_cb;
1711
1712 int obd_zombie_impexp_kill(void *arg)
1713 {
1714         int rc = 0;
1715
1716         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1717                 obd_zombie_impexp_cull();
1718                 rc = 1;
1719         }
1720         cfs_atomic_dec(&zombie_recur);
1721         return rc;
1722 }
1723
1724 #endif
1725
1726 /**
1727  * start destroy zombie import/export thread
1728  */
1729 int obd_zombie_impexp_init(void)
1730 {
1731         int rc;
1732
1733         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1734         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1735         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1736         cfs_init_completion(&obd_zombie_start);
1737         cfs_init_completion(&obd_zombie_stop);
1738         cfs_waitq_init(&obd_zombie_waitq);
1739         obd_zombie_pid = 0;
1740
1741 #ifdef __KERNEL__
1742         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1743         if (rc < 0)
1744                 RETURN(rc);
1745
1746         cfs_wait_for_completion(&obd_zombie_start);
1747 #else
1748
1749         obd_zombie_impexp_work_cb =
1750                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1751                                                  &obd_zombie_impexp_kill, NULL);
1752
1753         obd_zombie_impexp_idle_cb =
1754                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1755                                                  &obd_zombie_impexp_check, NULL);
1756         rc = 0;
1757 #endif
1758         RETURN(rc);
1759 }
1760 /**
1761  * stop destroy zombie import/export thread
1762  */
1763 void obd_zombie_impexp_stop(void)
1764 {
1765         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766         obd_zombie_impexp_notify();
1767 #ifdef __KERNEL__
1768         cfs_wait_for_completion(&obd_zombie_stop);
1769 #else
1770         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1771         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1772 #endif
1773 }
1774
1775 /***** Kernel-userspace comm helpers *******/
1776
1777 /* Get length of entire message, including header */
1778 int kuc_len(int payload_len)
1779 {
1780         return sizeof(struct kuc_hdr) + payload_len;
1781 }
1782 EXPORT_SYMBOL(kuc_len);
1783
1784 /* Get a pointer to kuc header, given a ptr to the payload
1785  * @param p Pointer to payload area
1786  * @returns Pointer to kuc header
1787  */
1788 struct kuc_hdr * kuc_ptr(void *p)
1789 {
1790         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1791         LASSERT(lh->kuc_magic == KUC_MAGIC);
1792         return lh;
1793 }
1794 EXPORT_SYMBOL(kuc_ptr);
1795
1796 /* Test if payload is part of kuc message
1797  * @param p Pointer to payload area
1798  * @returns boolean
1799  */
1800 int kuc_ispayload(void *p)
1801 {
1802         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1803
1804         if (kh->kuc_magic == KUC_MAGIC)
1805                 return 1;
1806         else
1807                 return 0;
1808 }
1809 EXPORT_SYMBOL(kuc_ispayload);
1810
1811 /* Alloc space for a message, and fill in header
1812  * @return Pointer to payload area
1813  */
1814 void *kuc_alloc(int payload_len, int transport, int type)
1815 {
1816         struct kuc_hdr *lh;
1817         int len = kuc_len(payload_len);
1818
1819         OBD_ALLOC(lh, len);
1820         if (lh == NULL)
1821                 return ERR_PTR(-ENOMEM);
1822
1823         lh->kuc_magic = KUC_MAGIC;
1824         lh->kuc_transport = transport;
1825         lh->kuc_msgtype = type;
1826         lh->kuc_msglen = len;
1827
1828         return (void *)(lh + 1);
1829 }
1830 EXPORT_SYMBOL(kuc_alloc);
1831
1832 /* Takes pointer to payload area */
1833 inline void kuc_free(void *p, int payload_len)
1834 {
1835         struct kuc_hdr *lh = kuc_ptr(p);
1836         OBD_FREE(lh, kuc_len(payload_len));
1837 }
1838 EXPORT_SYMBOL(kuc_free);
1839
1840
1841