Whamcloud - gitweb
LU-1156 ldlm: per-export waiting flock lists for deadlock detection
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/obdclass/genops.c
39  *
40  * These are the only exported functions, they provide some generic
41  * infrastructure for managing object devices
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48 #include <obd_ost.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
51
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 cfs_list_t      obd_zombie_imports;
61 cfs_list_t      obd_zombie_exports;
62 cfs_spinlock_t  obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67                               const char *status, int locks);
68
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70
71 /*
72  * support functions: we could use inter-module communication, but this
73  * is more portable to other OS's
74  */
75 static struct obd_device *obd_device_alloc(void)
76 {
77         struct obd_device *obd;
78
79         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80         if (obd != NULL) {
81                 obd->obd_magic = OBD_DEVICE_MAGIC;
82         }
83         return obd;
84 }
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         cfs_list_t *tmp;
103         struct obd_type *type;
104
105         cfs_spin_lock(&obd_types_lock);
106         cfs_list_for_each(tmp, &obd_types) {
107                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         cfs_spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         cfs_spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         cfs_spin_lock(&obd_types_lock);
253         cfs_list_del(&type->typ_chain);
254         cfs_spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         RETURN(0);
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *         pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283         ENTRY;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0]='\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         cfs_write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 RETURN(ERR_PTR(-EOVERFLOW));
341         }
342
343         if (IS_ERR(result)) {
344                 obd_device_free(newdev);
345                 class_put_type(type);
346         } else {
347                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                        result->obd_name, result);
349         }
350         RETURN(result);
351 }
352
353 void class_release_dev(struct obd_device *obd)
354 {
355         struct obd_type *obd_type = obd->obd_type;
356
357         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361         LASSERT(obd_type != NULL);
362
363         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365
366         cfs_write_lock(&obd_dev_lock);
367         obd_devs[obd->obd_minor] = NULL;
368         cfs_write_unlock(&obd_dev_lock);
369         obd_device_free(obd);
370
371         class_put_type(obd_type);
372 }
373
374 int class_name2dev(const char *name)
375 {
376         int i;
377
378         if (!name)
379                 return -1;
380
381         cfs_read_lock(&obd_dev_lock);
382         for (i = 0; i < class_devno_max(); i++) {
383                 struct obd_device *obd = class_num2obd(i);
384
385                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386                         /* Make sure we finished attaching before we give
387                            out any references */
388                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389                         if (obd->obd_attached) {
390                                 cfs_read_unlock(&obd_dev_lock);
391                                 return i;
392                         }
393                         break;
394                 }
395         }
396         cfs_read_unlock(&obd_dev_lock);
397
398         return -1;
399 }
400 EXPORT_SYMBOL(class_name2dev);
401
402 struct obd_device *class_name2obd(const char *name)
403 {
404         int dev = class_name2dev(name);
405
406         if (dev < 0 || dev > class_devno_max())
407                 return NULL;
408         return class_num2obd(dev);
409 }
410 EXPORT_SYMBOL(class_name2obd);
411
412 int class_uuid2dev(struct obd_uuid *uuid)
413 {
414         int i;
415
416         cfs_read_lock(&obd_dev_lock);
417         for (i = 0; i < class_devno_max(); i++) {
418                 struct obd_device *obd = class_num2obd(i);
419
420                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         cfs_read_unlock(&obd_dev_lock);
423                         return i;
424                 }
425         }
426         cfs_read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430 EXPORT_SYMBOL(class_uuid2dev);
431
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 {
434         int dev = class_uuid2dev(uuid);
435         if (dev < 0)
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_uuid2obd);
440
441 /**
442  * Get obd device from ::obd_devs[]
443  *
444  * \param num [in] array index
445  *
446  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447  *         otherwise return the obd device there.
448  */
449 struct obd_device *class_num2obd(int num)
450 {
451         struct obd_device *obd = NULL;
452
453         if (num < class_devno_max()) {
454                 obd = obd_devs[num];
455                 if (obd == NULL)
456                         return NULL;
457
458                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459                          "%p obd_magic %08x != %08x\n",
460                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461                 LASSERTF(obd->obd_minor == num,
462                          "%p obd_minor %0d != %0d\n",
463                          obd, obd->obd_minor, num);
464         }
465
466         return obd;
467 }
468 EXPORT_SYMBOL(class_num2obd);
469
470 void class_obd_list(void)
471 {
472         char *status;
473         int i;
474
475         cfs_read_lock(&obd_dev_lock);
476         for (i = 0; i < class_devno_max(); i++) {
477                 struct obd_device *obd = class_num2obd(i);
478
479                 if (obd == NULL)
480                         continue;
481                 if (obd->obd_stopping)
482                         status = "ST";
483                 else if (obd->obd_set_up)
484                         status = "UP";
485                 else if (obd->obd_attached)
486                         status = "AT";
487                 else
488                         status = "--";
489                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490                          i, status, obd->obd_type->typ_name,
491                          obd->obd_name, obd->obd_uuid.uuid,
492                          cfs_atomic_read(&obd->obd_refcount));
493         }
494         cfs_read_unlock(&obd_dev_lock);
495         return;
496 }
497
498 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
499    specified, then only the client with that uuid is returned,
500    otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502                                           const char * typ_name,
503                                           struct obd_uuid *grp_uuid)
504 {
505         int i;
506
507         cfs_read_lock(&obd_dev_lock);
508         for (i = 0; i < class_devno_max(); i++) {
509                 struct obd_device *obd = class_num2obd(i);
510
511                 if (obd == NULL)
512                         continue;
513                 if ((strncmp(obd->obd_type->typ_name, typ_name,
514                              strlen(typ_name)) == 0)) {
515                         if (obd_uuid_equals(tgt_uuid,
516                                             &obd->u.cli.cl_target_uuid) &&
517                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
518                                                          &obd->obd_uuid) : 1)) {
519                                 cfs_read_unlock(&obd_dev_lock);
520                                 return obd;
521                         }
522                 }
523         }
524         cfs_read_unlock(&obd_dev_lock);
525
526         return NULL;
527 }
528 EXPORT_SYMBOL(class_find_client_obd);
529
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531    searching at *next, and if a device is found, the next index to look
532    at is saved in *next. If next is NULL, then the first matching device
533    will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
535 {
536         int i;
537
538         if (next == NULL)
539                 i = 0;
540         else if (*next >= 0 && *next < class_devno_max())
541                 i = *next;
542         else
543                 return NULL;
544
545         cfs_read_lock(&obd_dev_lock);
546         for (; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552                         if (next != NULL)
553                                 *next = i+1;
554                         cfs_read_unlock(&obd_dev_lock);
555                         return obd;
556                 }
557         }
558         cfs_read_unlock(&obd_dev_lock);
559
560         return NULL;
561 }
562 EXPORT_SYMBOL(class_devices_in_group);
563
564 /**
565  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566  * adjust sptlrpc settings accordingly.
567  */
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 {
570         struct obd_device  *obd;
571         const char         *type;
572         int                 i, rc = 0, rc2;
573
574         LASSERT(namelen > 0);
575
576         cfs_read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 obd = class_num2obd(i);
579
580                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581                         continue;
582
583                 /* only notify mdc, osc, mdt, ost */
584                 type = obd->obd_type->typ_name;
585                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OST_NAME) != 0)
589                         continue;
590
591                 if (strncmp(obd->obd_name, fsname, namelen))
592                         continue;
593
594                 class_incref(obd, __FUNCTION__, obd);
595                 cfs_read_unlock(&obd_dev_lock);
596                 rc2 = obd_set_info_async(obd->obd_self_export,
597                                          sizeof(KEY_SPTLRPC_CONF),
598                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
599                 rc = rc ? rc : rc2;
600                 class_decref(obd, __FUNCTION__, obd);
601                 cfs_read_lock(&obd_dev_lock);
602         }
603         cfs_read_unlock(&obd_dev_lock);
604         return rc;
605 }
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607
608 void obd_cleanup_caches(void)
609 {
610         int rc;
611
612         ENTRY;
613         if (obd_device_cachep) {
614                 rc = cfs_mem_cache_destroy(obd_device_cachep);
615                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616                 obd_device_cachep = NULL;
617         }
618         if (obdo_cachep) {
619                 rc = cfs_mem_cache_destroy(obdo_cachep);
620                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
621                 obdo_cachep = NULL;
622         }
623         if (import_cachep) {
624                 rc = cfs_mem_cache_destroy(import_cachep);
625                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626                 import_cachep = NULL;
627         }
628         if (capa_cachep) {
629                 rc = cfs_mem_cache_destroy(capa_cachep);
630                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
631                 capa_cachep = NULL;
632         }
633         EXIT;
634 }
635
636 int obd_init_caches(void)
637 {
638         ENTRY;
639
640         LASSERT(obd_device_cachep == NULL);
641         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642                                                  sizeof(struct obd_device),
643                                                  0, 0);
644         if (!obd_device_cachep)
645                 GOTO(out, -ENOMEM);
646
647         LASSERT(obdo_cachep == NULL);
648         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
649                                            0, 0);
650         if (!obdo_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(import_cachep == NULL);
654         import_cachep = cfs_mem_cache_create("ll_import_cache",
655                                              sizeof(struct obd_import),
656                                              0, 0);
657         if (!import_cachep)
658                 GOTO(out, -ENOMEM);
659
660         LASSERT(capa_cachep == NULL);
661         capa_cachep = cfs_mem_cache_create("capa_cache",
662                                            sizeof(struct obd_capa), 0, 0);
663         if (!capa_cachep)
664                 GOTO(out, -ENOMEM);
665
666         RETURN(0);
667  out:
668         obd_cleanup_caches();
669         RETURN(-ENOMEM);
670
671 }
672
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 {
676         struct obd_export *export;
677         ENTRY;
678
679         if (!conn) {
680                 CDEBUG(D_CACHE, "looking for null handle\n");
681                 RETURN(NULL);
682         }
683
684         if (conn->cookie == -1) {  /* this means assign a new connection */
685                 CDEBUG(D_CACHE, "want a new connection\n");
686                 RETURN(NULL);
687         }
688
689         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690         export = class_handle2object(conn->cookie);
691         RETURN(export);
692 }
693 EXPORT_SYMBOL(class_conn2export);
694
695 struct obd_device *class_exp2obd(struct obd_export *exp)
696 {
697         if (exp)
698                 return exp->exp_obd;
699         return NULL;
700 }
701 EXPORT_SYMBOL(class_exp2obd);
702
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         export = class_conn2export(conn);
707         if (export) {
708                 struct obd_device *obd = export->exp_obd;
709                 class_export_put(export);
710                 return obd;
711         }
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_conn2obd);
715
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         if (obd == NULL)
720                 return NULL;
721         return obd->u.cli.cl_import;
722 }
723 EXPORT_SYMBOL(class_exp2cliimp);
724
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 {
727         struct obd_device *obd = class_conn2obd(conn);
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_conn2cliimp);
733
734 /* Export management functions */
735
736 /* if export is involved in recovery then clean up related things */
737 void class_export_recovery_cleanup(struct obd_export *exp)
738 {
739         struct obd_device *obd = exp->exp_obd;
740
741         cfs_spin_lock(&obd->obd_recovery_task_lock);
742         if (exp->exp_delayed)
743                 obd->obd_delayed_clients--;
744         if (obd->obd_recovering && exp->exp_in_recovery) {
745                 cfs_spin_lock(&exp->exp_lock);
746                 exp->exp_in_recovery = 0;
747                 cfs_spin_unlock(&exp->exp_lock);
748                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
749                 cfs_atomic_dec(&obd->obd_connected_clients);
750         }
751         cfs_spin_unlock(&obd->obd_recovery_task_lock);
752         /** Cleanup req replay fields */
753         if (exp->exp_req_replay_needed) {
754                 cfs_spin_lock(&exp->exp_lock);
755                 exp->exp_req_replay_needed = 0;
756                 cfs_spin_unlock(&exp->exp_lock);
757                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
758                 cfs_atomic_dec(&obd->obd_req_replay_clients);
759         }
760         /** Cleanup lock replay data */
761         if (exp->exp_lock_replay_needed) {
762                 cfs_spin_lock(&exp->exp_lock);
763                 exp->exp_lock_replay_needed = 0;
764                 cfs_spin_unlock(&exp->exp_lock);
765                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
766                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
767         }
768 }
769
770 static void class_export_destroy(struct obd_export *exp)
771 {
772         struct obd_device *obd = exp->exp_obd;
773         ENTRY;
774
775         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776         LASSERT(obd != NULL);
777
778         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779                exp->exp_client_uuid.uuid, obd->obd_name);
780
781
782         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783         if (exp->exp_connection)
784                 ptlrpc_put_connection_superhack(exp->exp_connection);
785
786         LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
787         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
788         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
789         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
790         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
791         obd_destroy_export(exp);
792         class_decref(obd, "export", exp);
793
794         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
795         EXIT;
796 }
797
798 static void export_handle_addref(void *export)
799 {
800         class_export_get(export);
801 }
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         cfs_atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                cfs_atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                cfs_atomic_read(&exp->exp_refcount) - 1);
818
819         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
821                 CDEBUG(D_IOCTL, "final put %p/%s\n",
822                        exp, exp->exp_client_uuid.uuid);
823
824                 /* release nid stat refererence */
825                 lprocfs_exp_cleanup(exp);
826                 class_export_recovery_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         cfs_hash_t *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         cfs_atomic_set(&export->exp_refcount, 2);
851         cfs_atomic_set(&export->exp_rpc_count, 0);
852         cfs_atomic_set(&export->exp_cb_count, 0);
853         cfs_atomic_set(&export->exp_locks_count, 0);
854 #if LUSTRE_TRACKS_LOCK_EXP_REFS
855         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
856         cfs_spin_lock_init(&export->exp_locks_list_guard);
857 #endif
858         cfs_atomic_set(&export->exp_replay_count, 0);
859         export->exp_obd = obd;
860         CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
861         cfs_rwlock_init(&export->exp_flock_wait_lock);
862         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
863         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
864         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
865         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
866         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
867         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
868         class_handle_hash(&export->exp_handle, export_handle_addref);
869         export->exp_last_request_time = cfs_time_current_sec();
870         cfs_spin_lock_init(&export->exp_lock);
871         cfs_spin_lock_init(&export->exp_rpc_lock);
872         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
873         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
874         cfs_spin_lock_init(&export->exp_bl_list_lock);
875         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
876
877         export->exp_sp_peer = LUSTRE_SP_ANY;
878         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
879         export->exp_client_uuid = *cluuid;
880         obd_init_export(export);
881
882         cfs_spin_lock(&obd->obd_dev_lock);
883          /* shouldn't happen, but might race */
884         if (obd->obd_stopping)
885                 GOTO(exit_unlock, rc = -ENODEV);
886
887         hash = cfs_hash_getref(obd->obd_uuid_hash);
888         if (hash == NULL)
889                 GOTO(exit_unlock, rc = -ENODEV);
890         cfs_spin_unlock(&obd->obd_dev_lock);
891
892         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
893                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894                 if (rc != 0) {
895                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
896                                       obd->obd_name, cluuid->uuid, rc);
897                         GOTO(exit_err, rc = -EALREADY);
898                 }
899         }
900
901         cfs_spin_lock(&obd->obd_dev_lock);
902         if (obd->obd_stopping) {
903                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904                 GOTO(exit_unlock, rc = -ENODEV);
905         }
906
907         class_incref(obd, "export", export);
908         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909         cfs_list_add_tail(&export->exp_obd_chain_timed,
910                           &export->exp_obd->obd_exports_timed);
911         export->exp_obd->obd_num_exports++;
912         cfs_spin_unlock(&obd->obd_dev_lock);
913         cfs_hash_putref(hash);
914         RETURN(export);
915
916 exit_unlock:
917         cfs_spin_unlock(&obd->obd_dev_lock);
918 exit_err:
919         if (hash)
920                 cfs_hash_putref(hash);
921         class_handle_unhash(&export->exp_handle);
922         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
923         obd_destroy_export(export);
924         OBD_FREE_PTR(export);
925         return ERR_PTR(rc);
926 }
927 EXPORT_SYMBOL(class_new_export);
928
929 void class_unlink_export(struct obd_export *exp)
930 {
931         class_handle_unhash(&exp->exp_handle);
932
933         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
934         /* delete an uuid-export hashitem from hashtables */
935         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
936                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937                              &exp->exp_client_uuid,
938                              &exp->exp_uuid_hash);
939
940         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
941         cfs_list_del_init(&exp->exp_obd_chain_timed);
942         exp->exp_obd->obd_num_exports--;
943         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
944         class_export_put(exp);
945 }
946 EXPORT_SYMBOL(class_unlink_export);
947
948 /* Import management functions */
949 void class_import_destroy(struct obd_import *imp)
950 {
951         ENTRY;
952
953         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
954                 imp->imp_obd->obd_name);
955
956         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
957
958         ptlrpc_put_connection_superhack(imp->imp_connection);
959
960         while (!cfs_list_empty(&imp->imp_conn_list)) {
961                 struct obd_import_conn *imp_conn;
962
963                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
964                                           struct obd_import_conn, oic_item);
965                 cfs_list_del_init(&imp_conn->oic_item);
966                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
967                 OBD_FREE(imp_conn, sizeof(*imp_conn));
968         }
969
970         LASSERT(imp->imp_sec == NULL);
971         class_decref(imp->imp_obd, "import", imp);
972         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
973         EXIT;
974 }
975
976 static void import_handle_addref(void *import)
977 {
978         class_import_get(import);
979 }
980
981 struct obd_import *class_import_get(struct obd_import *import)
982 {
983         cfs_atomic_inc(&import->imp_refcount);
984         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985                cfs_atomic_read(&import->imp_refcount),
986                import->imp_obd->obd_name);
987         return import;
988 }
989 EXPORT_SYMBOL(class_import_get);
990
991 void class_import_put(struct obd_import *imp)
992 {
993         ENTRY;
994
995         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
997
998         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999                cfs_atomic_read(&imp->imp_refcount) - 1,
1000                imp->imp_obd->obd_name);
1001
1002         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003                 CDEBUG(D_INFO, "final put import %p\n", imp);
1004                 obd_zombie_import_add(imp);
1005         }
1006
1007         EXIT;
1008 }
1009 EXPORT_SYMBOL(class_import_put);
1010
1011 static void init_imp_at(struct imp_at *at) {
1012         int i;
1013         at_init(&at->iat_net_latency, 0, 0);
1014         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1015                 /* max service estimates are tracked on the server side, so
1016                    don't use the AT history here, just use the last reported
1017                    val. (But keep hist for proc histogram, worst_ever) */
1018                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1019                         AT_FLG_NOHIST);
1020         }
1021 }
1022
1023 struct obd_import *class_new_import(struct obd_device *obd)
1024 {
1025         struct obd_import *imp;
1026
1027         OBD_ALLOC(imp, sizeof(*imp));
1028         if (imp == NULL)
1029                 return NULL;
1030
1031         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1033         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1034         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1035         cfs_spin_lock_init(&imp->imp_lock);
1036         imp->imp_last_success_conn = 0;
1037         imp->imp_state = LUSTRE_IMP_NEW;
1038         imp->imp_obd = class_incref(obd, "import", imp);
1039         cfs_mutex_init(&imp->imp_sec_mutex);
1040         cfs_waitq_init(&imp->imp_recovery_waitq);
1041
1042         cfs_atomic_set(&imp->imp_refcount, 2);
1043         cfs_atomic_set(&imp->imp_unregistering, 0);
1044         cfs_atomic_set(&imp->imp_inflight, 0);
1045         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1046         cfs_atomic_set(&imp->imp_inval_count, 0);
1047         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1048         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1049         class_handle_hash(&imp->imp_handle, import_handle_addref);
1050         init_imp_at(&imp->imp_at);
1051
1052         /* the default magic is V2, will be used in connect RPC, and
1053          * then adjusted according to the flags in request/reply. */
1054         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1055
1056         return imp;
1057 }
1058 EXPORT_SYMBOL(class_new_import);
1059
1060 void class_destroy_import(struct obd_import *import)
1061 {
1062         LASSERT(import != NULL);
1063         LASSERT(import != LP_POISON);
1064
1065         class_handle_unhash(&import->imp_handle);
1066
1067         cfs_spin_lock(&import->imp_lock);
1068         import->imp_generation++;
1069         cfs_spin_unlock(&import->imp_lock);
1070         class_import_put(import);
1071 }
1072 EXPORT_SYMBOL(class_destroy_import);
1073
1074 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1075
1076 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1077 {
1078         cfs_spin_lock(&exp->exp_locks_list_guard);
1079
1080         LASSERT(lock->l_exp_refs_nr >= 0);
1081
1082         if (lock->l_exp_refs_target != NULL &&
1083             lock->l_exp_refs_target != exp) {
1084                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1085                               exp, lock, lock->l_exp_refs_target);
1086         }
1087         if ((lock->l_exp_refs_nr ++) == 0) {
1088                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1089                 lock->l_exp_refs_target = exp;
1090         }
1091         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1092                lock, exp, lock->l_exp_refs_nr);
1093         cfs_spin_unlock(&exp->exp_locks_list_guard);
1094 }
1095 EXPORT_SYMBOL(__class_export_add_lock_ref);
1096
1097 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1098 {
1099         cfs_spin_lock(&exp->exp_locks_list_guard);
1100         LASSERT(lock->l_exp_refs_nr > 0);
1101         if (lock->l_exp_refs_target != exp) {
1102                 LCONSOLE_WARN("lock %p, "
1103                               "mismatching export pointers: %p, %p\n",
1104                               lock, lock->l_exp_refs_target, exp);
1105         }
1106         if (-- lock->l_exp_refs_nr == 0) {
1107                 cfs_list_del_init(&lock->l_exp_refs_link);
1108                 lock->l_exp_refs_target = NULL;
1109         }
1110         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1111                lock, exp, lock->l_exp_refs_nr);
1112         cfs_spin_unlock(&exp->exp_locks_list_guard);
1113 }
1114 EXPORT_SYMBOL(__class_export_del_lock_ref);
1115 #endif
1116
1117 /* A connection defines an export context in which preallocation can
1118    be managed. This releases the export pointer reference, and returns
1119    the export handle, so the export refcount is 1 when this function
1120    returns. */
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122                   struct obd_uuid *cluuid)
1123 {
1124         struct obd_export *export;
1125         LASSERT(conn != NULL);
1126         LASSERT(obd != NULL);
1127         LASSERT(cluuid != NULL);
1128         ENTRY;
1129
1130         export = class_new_export(obd, cluuid);
1131         if (IS_ERR(export))
1132                 RETURN(PTR_ERR(export));
1133
1134         conn->cookie = export->exp_handle.h_cookie;
1135         class_export_put(export);
1136
1137         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138                cluuid->uuid, conn->cookie);
1139         RETURN(0);
1140 }
1141 EXPORT_SYMBOL(class_connect);
1142
1143
1144 /* This function removes 1-3 references from the export:
1145  * 1 - for export pointer passed
1146  * and if disconnect really need
1147  * 2 - removing from hash
1148  * 3 - in client_unlink_export
1149  * The export pointer passed to this function can destroyed */
1150 int class_disconnect(struct obd_export *export)
1151 {
1152         int already_disconnected;
1153         ENTRY;
1154
1155         if (export == NULL) {
1156                 CWARN("attempting to free NULL export %p\n", export);
1157                 RETURN(-EINVAL);
1158         }
1159
1160         cfs_spin_lock(&export->exp_lock);
1161         already_disconnected = export->exp_disconnected;
1162         export->exp_disconnected = 1;
1163         cfs_spin_unlock(&export->exp_lock);
1164
1165         /* class_cleanup(), abort_recovery(), and class_fail_export()
1166          * all end up in here, and if any of them race we shouldn't
1167          * call extra class_export_puts(). */
1168         if (already_disconnected) {
1169                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1170                 GOTO(no_disconn, already_disconnected);
1171         }
1172
1173         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1174                export->exp_handle.h_cookie);
1175
1176         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1177                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1178                              &export->exp_connection->c_peer.nid,
1179                              &export->exp_nid_hash);
1180
1181         class_unlink_export(export);
1182 no_disconn:
1183         class_export_put(export);
1184         RETURN(0);
1185 }
1186 EXPORT_SYMBOL(class_disconnect);
1187
1188 /* Return non-zero for a fully connected export */
1189 int class_connected_export(struct obd_export *exp)
1190 {
1191         if (exp) {
1192                 int connected;
1193                 cfs_spin_lock(&exp->exp_lock);
1194                 connected = (exp->exp_conn_cnt > 0);
1195                 cfs_spin_unlock(&exp->exp_lock);
1196                 return connected;
1197         }
1198         return 0;
1199 }
1200 EXPORT_SYMBOL(class_connected_export);
1201
1202 static void class_disconnect_export_list(cfs_list_t *list,
1203                                          enum obd_option flags)
1204 {
1205         int rc;
1206         struct obd_export *exp;
1207         ENTRY;
1208
1209         /* It's possible that an export may disconnect itself, but
1210          * nothing else will be added to this list. */
1211         while (!cfs_list_empty(list)) {
1212                 exp = cfs_list_entry(list->next, struct obd_export,
1213                                      exp_obd_chain);
1214                 /* need for safe call CDEBUG after obd_disconnect */
1215                 class_export_get(exp);
1216
1217                 cfs_spin_lock(&exp->exp_lock);
1218                 exp->exp_flags = flags;
1219                 cfs_spin_unlock(&exp->exp_lock);
1220
1221                 if (obd_uuid_equals(&exp->exp_client_uuid,
1222                                     &exp->exp_obd->obd_uuid)) {
1223                         CDEBUG(D_HA,
1224                                "exp %p export uuid == obd uuid, don't discon\n",
1225                                exp);
1226                         /* Need to delete this now so we don't end up pointing
1227                          * to work_list later when this export is cleaned up. */
1228                         cfs_list_del_init(&exp->exp_obd_chain);
1229                         class_export_put(exp);
1230                         continue;
1231                 }
1232
1233                 class_export_get(exp);
1234                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1235                        "last request at "CFS_TIME_T"\n",
1236                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1237                        exp, exp->exp_last_request_time);
1238                 /* release one export reference anyway */
1239                 rc = obd_disconnect(exp);
1240
1241                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1242                        obd_export_nid2str(exp), exp, rc);
1243                 class_export_put(exp);
1244         }
1245         EXIT;
1246 }
1247
1248 void class_disconnect_exports(struct obd_device *obd)
1249 {
1250         cfs_list_t work_list;
1251         ENTRY;
1252
1253         /* Move all of the exports from obd_exports to a work list, en masse. */
1254         CFS_INIT_LIST_HEAD(&work_list);
1255         cfs_spin_lock(&obd->obd_dev_lock);
1256         cfs_list_splice_init(&obd->obd_exports, &work_list);
1257         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1258         cfs_spin_unlock(&obd->obd_dev_lock);
1259
1260         if (!cfs_list_empty(&work_list)) {
1261                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1262                        "disconnecting them\n", obd->obd_minor, obd);
1263                 class_disconnect_export_list(&work_list,
1264                                              exp_flags_from_obd(obd));
1265         } else
1266                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1267                        obd->obd_minor, obd);
1268         EXIT;
1269 }
1270 EXPORT_SYMBOL(class_disconnect_exports);
1271
1272 /* Remove exports that have not completed recovery.
1273  */
1274 void class_disconnect_stale_exports(struct obd_device *obd,
1275                                     int (*test_export)(struct obd_export *))
1276 {
1277         cfs_list_t work_list;
1278         cfs_list_t *pos, *n;
1279         struct obd_export *exp;
1280         int evicted = 0;
1281         ENTRY;
1282
1283         CFS_INIT_LIST_HEAD(&work_list);
1284         cfs_spin_lock(&obd->obd_dev_lock);
1285         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1286                 int failed;
1287
1288                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1289
1290                 /* don't count self-export as client */
1291                 if (obd_uuid_equals(&exp->exp_client_uuid,
1292                                     &exp->exp_obd->obd_uuid))
1293                         continue;
1294
1295                 if (test_export(exp))
1296                         continue;
1297
1298                 cfs_spin_lock(&exp->exp_lock);
1299                 failed = exp->exp_failed;
1300                 exp->exp_failed = 1;
1301                 cfs_spin_unlock(&exp->exp_lock);
1302                 if (failed)
1303                         continue;
1304
1305                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1306                 evicted++;
1307                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1308                        obd->obd_name, exp->exp_client_uuid.uuid,
1309                        exp->exp_connection == NULL ? "<unknown>" :
1310                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1311                 print_export_data(exp, "EVICTING", 0);
1312         }
1313         cfs_spin_unlock(&obd->obd_dev_lock);
1314
1315         if (evicted) {
1316                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1317                        obd->obd_name, evicted);
1318                 obd->obd_stale_clients += evicted;
1319         }
1320         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1321                                                  OBD_OPT_ABORT_RECOV);
1322         EXIT;
1323 }
1324 EXPORT_SYMBOL(class_disconnect_stale_exports);
1325
1326 void class_fail_export(struct obd_export *exp)
1327 {
1328         int rc, already_failed;
1329
1330         cfs_spin_lock(&exp->exp_lock);
1331         already_failed = exp->exp_failed;
1332         exp->exp_failed = 1;
1333         cfs_spin_unlock(&exp->exp_lock);
1334
1335         if (already_failed) {
1336                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1337                        exp, exp->exp_client_uuid.uuid);
1338                 return;
1339         }
1340
1341         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1342                exp, exp->exp_client_uuid.uuid);
1343
1344         if (obd_dump_on_timeout)
1345                 libcfs_debug_dumplog();
1346
1347         /* need for safe call CDEBUG after obd_disconnect */
1348         class_export_get(exp);
1349
1350         /* Most callers into obd_disconnect are removing their own reference
1351          * (request, for example) in addition to the one from the hash table.
1352          * We don't have such a reference here, so make one. */
1353         class_export_get(exp);
1354         rc = obd_disconnect(exp);
1355         if (rc)
1356                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1357         else
1358                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1359                        exp, exp->exp_client_uuid.uuid);
1360         class_export_put(exp);
1361 }
1362 EXPORT_SYMBOL(class_fail_export);
1363
1364 char *obd_export_nid2str(struct obd_export *exp)
1365 {
1366         if (exp->exp_connection != NULL)
1367                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1368
1369         return "(no nid)";
1370 }
1371 EXPORT_SYMBOL(obd_export_nid2str);
1372
1373 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1374 {
1375         struct obd_export *doomed_exp = NULL;
1376         int exports_evicted = 0;
1377
1378         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1379
1380         do {
1381                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1382                 if (doomed_exp == NULL)
1383                         break;
1384
1385                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1386                          "nid %s found, wanted nid %s, requested nid %s\n",
1387                          obd_export_nid2str(doomed_exp),
1388                          libcfs_nid2str(nid_key), nid);
1389                 LASSERTF(doomed_exp != obd->obd_self_export,
1390                          "self-export is hashed by NID?\n");
1391                 exports_evicted++;
1392                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1393                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1394                        exports_evicted);
1395                 class_fail_export(doomed_exp);
1396                 class_export_put(doomed_exp);
1397         } while (1);
1398
1399         if (!exports_evicted)
1400                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1401                        obd->obd_name, nid);
1402         return exports_evicted;
1403 }
1404 EXPORT_SYMBOL(obd_export_evict_by_nid);
1405
1406 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1407 {
1408         struct obd_export *doomed_exp = NULL;
1409         struct obd_uuid doomed_uuid;
1410         int exports_evicted = 0;
1411
1412         obd_str2uuid(&doomed_uuid, uuid);
1413         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1414                 CERROR("%s: can't evict myself\n", obd->obd_name);
1415                 return exports_evicted;
1416         }
1417
1418         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1419
1420         if (doomed_exp == NULL) {
1421                 CERROR("%s: can't disconnect %s: no exports found\n",
1422                        obd->obd_name, uuid);
1423         } else {
1424                 CWARN("%s: evicting %s at adminstrative request\n",
1425                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1426                 class_fail_export(doomed_exp);
1427                 class_export_put(doomed_exp);
1428                 exports_evicted++;
1429         }
1430
1431         return exports_evicted;
1432 }
1433 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1434
1435 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1436 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1437 EXPORT_SYMBOL(class_export_dump_hook);
1438 #endif
1439
1440 static void print_export_data(struct obd_export *exp, const char *status,
1441                               int locks)
1442 {
1443         struct ptlrpc_reply_state *rs;
1444         struct ptlrpc_reply_state *first_reply = NULL;
1445         int nreplies = 0;
1446
1447         cfs_spin_lock(&exp->exp_lock);
1448         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1449                                 rs_exp_list) {
1450                 if (nreplies == 0)
1451                         first_reply = rs;
1452                 nreplies++;
1453         }
1454         cfs_spin_unlock(&exp->exp_lock);
1455
1456         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1457                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1458                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1459                cfs_atomic_read(&exp->exp_rpc_count),
1460                cfs_atomic_read(&exp->exp_cb_count),
1461                cfs_atomic_read(&exp->exp_locks_count),
1462                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1463                nreplies, first_reply, nreplies > 3 ? "..." : "",
1464                exp->exp_last_committed);
1465 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1466         if (locks && class_export_dump_hook != NULL)
1467                 class_export_dump_hook(exp);
1468 #endif
1469 }
1470
1471 void dump_exports(struct obd_device *obd, int locks)
1472 {
1473         struct obd_export *exp;
1474
1475         cfs_spin_lock(&obd->obd_dev_lock);
1476         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1477                 print_export_data(exp, "ACTIVE", locks);
1478         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1479                 print_export_data(exp, "UNLINKED", locks);
1480         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1481                 print_export_data(exp, "DELAYED", locks);
1482         cfs_spin_unlock(&obd->obd_dev_lock);
1483         cfs_spin_lock(&obd_zombie_impexp_lock);
1484         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1485                 print_export_data(exp, "ZOMBIE", locks);
1486         cfs_spin_unlock(&obd_zombie_impexp_lock);
1487 }
1488 EXPORT_SYMBOL(dump_exports);
1489
1490 void obd_exports_barrier(struct obd_device *obd)
1491 {
1492         int waited = 2;
1493         LASSERT(cfs_list_empty(&obd->obd_exports));
1494         cfs_spin_lock(&obd->obd_dev_lock);
1495         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1496                 cfs_spin_unlock(&obd->obd_dev_lock);
1497                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1498                                                    cfs_time_seconds(waited));
1499                 if (waited > 5 && IS_PO2(waited)) {
1500                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1501                                       "more than %d seconds. "
1502                                       "The obd refcount = %d. Is it stuck?\n",
1503                                       obd->obd_name, waited,
1504                                       cfs_atomic_read(&obd->obd_refcount));
1505                         dump_exports(obd, 1);
1506                 }
1507                 waited *= 2;
1508                 cfs_spin_lock(&obd->obd_dev_lock);
1509         }
1510         cfs_spin_unlock(&obd->obd_dev_lock);
1511 }
1512 EXPORT_SYMBOL(obd_exports_barrier);
1513
1514 /* Total amount of zombies to be destroyed */
1515 static int zombies_count = 0;
1516
1517 /**
1518  * kill zombie imports and exports
1519  */
1520 void obd_zombie_impexp_cull(void)
1521 {
1522         struct obd_import *import;
1523         struct obd_export *export;
1524         ENTRY;
1525
1526         do {
1527                 cfs_spin_lock(&obd_zombie_impexp_lock);
1528
1529                 import = NULL;
1530                 if (!cfs_list_empty(&obd_zombie_imports)) {
1531                         import = cfs_list_entry(obd_zombie_imports.next,
1532                                                 struct obd_import,
1533                                                 imp_zombie_chain);
1534                         cfs_list_del_init(&import->imp_zombie_chain);
1535                 }
1536
1537                 export = NULL;
1538                 if (!cfs_list_empty(&obd_zombie_exports)) {
1539                         export = cfs_list_entry(obd_zombie_exports.next,
1540                                                 struct obd_export,
1541                                                 exp_obd_chain);
1542                         cfs_list_del_init(&export->exp_obd_chain);
1543                 }
1544
1545                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1546
1547                 if (import != NULL) {
1548                         class_import_destroy(import);
1549                         cfs_spin_lock(&obd_zombie_impexp_lock);
1550                         zombies_count--;
1551                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1552                 }
1553
1554                 if (export != NULL) {
1555                         class_export_destroy(export);
1556                         cfs_spin_lock(&obd_zombie_impexp_lock);
1557                         zombies_count--;
1558                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1559                 }
1560
1561                 cfs_cond_resched();
1562         } while (import != NULL || export != NULL);
1563         EXIT;
1564 }
1565
1566 static cfs_completion_t         obd_zombie_start;
1567 static cfs_completion_t         obd_zombie_stop;
1568 static unsigned long            obd_zombie_flags;
1569 static cfs_waitq_t              obd_zombie_waitq;
1570 static pid_t                    obd_zombie_pid;
1571
1572 enum {
1573         OBD_ZOMBIE_STOP   = 1 << 1
1574 };
1575
1576 /**
1577  * check for work for kill zombie import/export thread.
1578  */
1579 static int obd_zombie_impexp_check(void *arg)
1580 {
1581         int rc;
1582
1583         cfs_spin_lock(&obd_zombie_impexp_lock);
1584         rc = (zombies_count == 0) &&
1585              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1586         cfs_spin_unlock(&obd_zombie_impexp_lock);
1587
1588         RETURN(rc);
1589 }
1590
1591 /**
1592  * Add export to the obd_zombe thread and notify it.
1593  */
1594 static void obd_zombie_export_add(struct obd_export *exp) {
1595         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1596         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1597         cfs_list_del_init(&exp->exp_obd_chain);
1598         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1599         cfs_spin_lock(&obd_zombie_impexp_lock);
1600         zombies_count++;
1601         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1602         cfs_spin_unlock(&obd_zombie_impexp_lock);
1603
1604         obd_zombie_impexp_notify();
1605 }
1606
1607 /**
1608  * Add import to the obd_zombe thread and notify it.
1609  */
1610 static void obd_zombie_import_add(struct obd_import *imp) {
1611         LASSERT(imp->imp_sec == NULL);
1612         LASSERT(imp->imp_rq_pool == NULL);
1613         cfs_spin_lock(&obd_zombie_impexp_lock);
1614         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1615         zombies_count++;
1616         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1617         cfs_spin_unlock(&obd_zombie_impexp_lock);
1618
1619         obd_zombie_impexp_notify();
1620 }
1621
1622 /**
1623  * notify import/export destroy thread about new zombie.
1624  */
1625 static void obd_zombie_impexp_notify(void)
1626 {
1627         /*
1628          * Make sure obd_zomebie_impexp_thread get this notification.
1629          * It is possible this signal only get by obd_zombie_barrier, and
1630          * barrier gulps this notification and sleeps away and hangs ensues
1631          */
1632         cfs_waitq_broadcast(&obd_zombie_waitq);
1633 }
1634
1635 /**
1636  * check whether obd_zombie is idle
1637  */
1638 static int obd_zombie_is_idle(void)
1639 {
1640         int rc;
1641
1642         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1643         cfs_spin_lock(&obd_zombie_impexp_lock);
1644         rc = (zombies_count == 0);
1645         cfs_spin_unlock(&obd_zombie_impexp_lock);
1646         return rc;
1647 }
1648
1649 /**
1650  * wait when obd_zombie import/export queues become empty
1651  */
1652 void obd_zombie_barrier(void)
1653 {
1654         struct l_wait_info lwi = { 0 };
1655
1656         if (obd_zombie_pid == cfs_curproc_pid())
1657                 /* don't wait for myself */
1658                 return;
1659         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1660 }
1661 EXPORT_SYMBOL(obd_zombie_barrier);
1662
1663 #ifdef __KERNEL__
1664
1665 /**
1666  * destroy zombie export/import thread.
1667  */
1668 static int obd_zombie_impexp_thread(void *unused)
1669 {
1670         int rc;
1671
1672         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1673                 cfs_complete(&obd_zombie_start);
1674                 RETURN(rc);
1675         }
1676
1677         cfs_complete(&obd_zombie_start);
1678
1679         obd_zombie_pid = cfs_curproc_pid();
1680
1681         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1682                 struct l_wait_info lwi = { 0 };
1683
1684                 l_wait_event(obd_zombie_waitq,
1685                              !obd_zombie_impexp_check(NULL), &lwi);
1686                 obd_zombie_impexp_cull();
1687
1688                 /*
1689                  * Notify obd_zombie_barrier callers that queues
1690                  * may be empty.
1691                  */
1692                 cfs_waitq_signal(&obd_zombie_waitq);
1693         }
1694
1695         cfs_complete(&obd_zombie_stop);
1696
1697         RETURN(0);
1698 }
1699
1700 #else /* ! KERNEL */
1701
1702 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1703 static void *obd_zombie_impexp_work_cb;
1704 static void *obd_zombie_impexp_idle_cb;
1705
1706 int obd_zombie_impexp_kill(void *arg)
1707 {
1708         int rc = 0;
1709
1710         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1711                 obd_zombie_impexp_cull();
1712                 rc = 1;
1713         }
1714         cfs_atomic_dec(&zombie_recur);
1715         return rc;
1716 }
1717
1718 #endif
1719
1720 /**
1721  * start destroy zombie import/export thread
1722  */
1723 int obd_zombie_impexp_init(void)
1724 {
1725         int rc;
1726
1727         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1728         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1729         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1730         cfs_init_completion(&obd_zombie_start);
1731         cfs_init_completion(&obd_zombie_stop);
1732         cfs_waitq_init(&obd_zombie_waitq);
1733         obd_zombie_pid = 0;
1734
1735 #ifdef __KERNEL__
1736         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1737         if (rc < 0)
1738                 RETURN(rc);
1739
1740         cfs_wait_for_completion(&obd_zombie_start);
1741 #else
1742
1743         obd_zombie_impexp_work_cb =
1744                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1745                                                  &obd_zombie_impexp_kill, NULL);
1746
1747         obd_zombie_impexp_idle_cb =
1748                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1749                                                  &obd_zombie_impexp_check, NULL);
1750         rc = 0;
1751 #endif
1752         RETURN(rc);
1753 }
1754 /**
1755  * stop destroy zombie import/export thread
1756  */
1757 void obd_zombie_impexp_stop(void)
1758 {
1759         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1760         obd_zombie_impexp_notify();
1761 #ifdef __KERNEL__
1762         cfs_wait_for_completion(&obd_zombie_stop);
1763 #else
1764         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1765         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1766 #endif
1767 }
1768
1769 /***** Kernel-userspace comm helpers *******/
1770
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1773 {
1774         return sizeof(struct kuc_hdr) + payload_len;
1775 }
1776 EXPORT_SYMBOL(kuc_len);
1777
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779  * @param p Pointer to payload area
1780  * @returns Pointer to kuc header
1781  */
1782 struct kuc_hdr * kuc_ptr(void *p)
1783 {
1784         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785         LASSERT(lh->kuc_magic == KUC_MAGIC);
1786         return lh;
1787 }
1788 EXPORT_SYMBOL(kuc_ptr);
1789
1790 /* Test if payload is part of kuc message
1791  * @param p Pointer to payload area
1792  * @returns boolean
1793  */
1794 int kuc_ispayload(void *p)
1795 {
1796         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1797
1798         if (kh->kuc_magic == KUC_MAGIC)
1799                 return 1;
1800         else
1801                 return 0;
1802 }
1803 EXPORT_SYMBOL(kuc_ispayload);
1804
1805 /* Alloc space for a message, and fill in header
1806  * @return Pointer to payload area
1807  */
1808 void *kuc_alloc(int payload_len, int transport, int type)
1809 {
1810         struct kuc_hdr *lh;
1811         int len = kuc_len(payload_len);
1812
1813         OBD_ALLOC(lh, len);
1814         if (lh == NULL)
1815                 return ERR_PTR(-ENOMEM);
1816
1817         lh->kuc_magic = KUC_MAGIC;
1818         lh->kuc_transport = transport;
1819         lh->kuc_msgtype = type;
1820         lh->kuc_msglen = len;
1821
1822         return (void *)(lh + 1);
1823 }
1824 EXPORT_SYMBOL(kuc_alloc);
1825
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1828 {
1829         struct kuc_hdr *lh = kuc_ptr(p);
1830         OBD_FREE(lh, kuc_len(payload_len));
1831 }
1832 EXPORT_SYMBOL(kuc_free);
1833
1834
1835