Whamcloud - gitweb
LU-9010 obdclass: use static initializer macros where possible
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
46
47 static DEFINE_SPINLOCK(obd_types_lock);
48 static LIST_HEAD(obd_types);
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static LIST_HEAD(obd_zombie_imports);
58 static LIST_HEAD(obd_zombie_exports);
59 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks, int debug_level);
66
67 static LIST_HEAD(obd_stale_exports);
68 static DEFINE_SPINLOCK(obd_stale_export_lock);
69 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, osp, lwp, mdt, ost
630                  * because only these have a -sptlrpc llog */
631                 type = obd->obd_type->typ_name;
632                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637                     strcmp(type, LUSTRE_OST_NAME) != 0)
638                         continue;
639
640                 if (strncmp(obd->obd_name, fsname, namelen))
641                         continue;
642
643                 class_incref(obd, __FUNCTION__, obd);
644                 read_unlock(&obd_dev_lock);
645                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646                                          sizeof(KEY_SPTLRPC_CONF),
647                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
648                 rc = rc ? rc : rc2;
649                 class_decref(obd, __FUNCTION__, obd);
650                 read_lock(&obd_dev_lock);
651         }
652         read_unlock(&obd_dev_lock);
653         return rc;
654 }
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
656
657 void obd_cleanup_caches(void)
658 {
659         ENTRY;
660         if (obd_device_cachep) {
661                 kmem_cache_destroy(obd_device_cachep);
662                 obd_device_cachep = NULL;
663         }
664         if (obdo_cachep) {
665                 kmem_cache_destroy(obdo_cachep);
666                 obdo_cachep = NULL;
667         }
668         if (import_cachep) {
669                 kmem_cache_destroy(import_cachep);
670                 import_cachep = NULL;
671         }
672
673         EXIT;
674 }
675
676 int obd_init_caches(void)
677 {
678         int rc;
679         ENTRY;
680
681         LASSERT(obd_device_cachep == NULL);
682         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683                                               sizeof(struct obd_device),
684                                               0, 0, NULL);
685         if (!obd_device_cachep)
686                 GOTO(out, rc = -ENOMEM);
687
688         LASSERT(obdo_cachep == NULL);
689         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
690                                         0, 0, NULL);
691         if (!obdo_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(import_cachep == NULL);
695         import_cachep = kmem_cache_create("ll_import_cache",
696                                           sizeof(struct obd_import),
697                                           0, 0, NULL);
698         if (!import_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         RETURN(0);
702 out:
703         obd_cleanup_caches();
704         RETURN(rc);
705 }
706
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 {
710         struct obd_export *export;
711         ENTRY;
712
713         if (!conn) {
714                 CDEBUG(D_CACHE, "looking for null handle\n");
715                 RETURN(NULL);
716         }
717
718         if (conn->cookie == -1) {  /* this means assign a new connection */
719                 CDEBUG(D_CACHE, "want a new connection\n");
720                 RETURN(NULL);
721         }
722
723         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
724         export = class_handle2object(conn->cookie, NULL);
725         RETURN(export);
726 }
727 EXPORT_SYMBOL(class_conn2export);
728
729 struct obd_device *class_exp2obd(struct obd_export *exp)
730 {
731         if (exp)
732                 return exp->exp_obd;
733         return NULL;
734 }
735 EXPORT_SYMBOL(class_exp2obd);
736
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         export = class_conn2export(conn);
741         if (export) {
742                 struct obd_device *obd = export->exp_obd;
743                 class_export_put(export);
744                 return obd;
745         }
746         return NULL;
747 }
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
768 {
769         struct obd_device *obd = exp->exp_obd;
770         ENTRY;
771
772         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773         LASSERT(obd != NULL);
774
775         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776                exp->exp_client_uuid.uuid, obd->obd_name);
777
778         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779         if (exp->exp_connection)
780                 ptlrpc_put_connection_superhack(exp->exp_connection);
781
782         LASSERT(list_empty(&exp->exp_outstanding_replies));
783         LASSERT(list_empty(&exp->exp_uncommitted_replies));
784         LASSERT(list_empty(&exp->exp_req_replay_queue));
785         LASSERT(list_empty(&exp->exp_hp_rpcs));
786         obd_destroy_export(exp);
787         class_decref(obd, "export", exp);
788
789         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790         EXIT;
791 }
792
793 static void export_handle_addref(void *export)
794 {
795         class_export_get(export);
796 }
797
798 static struct portals_handle_ops export_handle_ops = {
799         .hop_addref = export_handle_addref,
800         .hop_free   = NULL,
801 };
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                atomic_read(&exp->exp_refcount) - 1);
818
819         if (atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!list_empty(&exp->exp_obd_chain));
821                 LASSERT(list_empty(&exp->exp_stale_list));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         struct cfs_hash *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         INIT_HLIST_NODE(&export->exp_gen_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         INIT_LIST_HEAD(&export->exp_bl_list);
877         INIT_LIST_HEAD(&export->exp_stale_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904         spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         list_add_tail(&export->exp_obd_chain_timed,
913                       &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943         if (!hlist_unhashed(&exp->exp_gen_hash)) {
944                 struct tg_export_data   *ted = &exp->exp_target_data;
945                 struct cfs_hash         *hash;
946
947                 /* Because obd_gen_hash will not be released until
948                  * class_cleanup(), so hash should never be NULL here */
949                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
950                 LASSERT(hash != NULL);
951                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
952                              &exp->exp_gen_hash);
953                 cfs_hash_putref(hash);
954         }
955
956         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
957         list_del_init(&exp->exp_obd_chain_timed);
958         exp->exp_obd->obd_num_exports--;
959         spin_unlock(&exp->exp_obd->obd_dev_lock);
960         atomic_inc(&obd_stale_export_num);
961
962         /* A reference is kept by obd_stale_exports list */
963         obd_stale_export_put(exp);
964 }
965 EXPORT_SYMBOL(class_unlink_export);
966
967 /* Import management functions */
968 static void class_import_destroy(struct obd_import *imp)
969 {
970         ENTRY;
971
972         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
973                 imp->imp_obd->obd_name);
974
975         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
976
977         ptlrpc_put_connection_superhack(imp->imp_connection);
978
979         while (!list_empty(&imp->imp_conn_list)) {
980                 struct obd_import_conn *imp_conn;
981
982                 imp_conn = list_entry(imp->imp_conn_list.next,
983                                       struct obd_import_conn, oic_item);
984                 list_del_init(&imp_conn->oic_item);
985                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
986                 OBD_FREE(imp_conn, sizeof(*imp_conn));
987         }
988
989         LASSERT(imp->imp_sec == NULL);
990         class_decref(imp->imp_obd, "import", imp);
991         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
992         EXIT;
993 }
994
995 static void import_handle_addref(void *import)
996 {
997         class_import_get(import);
998 }
999
1000 static struct portals_handle_ops import_handle_ops = {
1001         .hop_addref = import_handle_addref,
1002         .hop_free   = NULL,
1003 };
1004
1005 struct obd_import *class_import_get(struct obd_import *import)
1006 {
1007         atomic_inc(&import->imp_refcount);
1008         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1009                atomic_read(&import->imp_refcount),
1010                import->imp_obd->obd_name);
1011         return import;
1012 }
1013 EXPORT_SYMBOL(class_import_get);
1014
1015 void class_import_put(struct obd_import *imp)
1016 {
1017         ENTRY;
1018
1019         LASSERT(list_empty(&imp->imp_zombie_chain));
1020         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1021
1022         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1023                atomic_read(&imp->imp_refcount) - 1,
1024                imp->imp_obd->obd_name);
1025
1026         if (atomic_dec_and_test(&imp->imp_refcount)) {
1027                 CDEBUG(D_INFO, "final put import %p\n", imp);
1028                 obd_zombie_import_add(imp);
1029         }
1030
1031         /* catch possible import put race */
1032         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1033         EXIT;
1034 }
1035 EXPORT_SYMBOL(class_import_put);
1036
1037 static void init_imp_at(struct imp_at *at) {
1038         int i;
1039         at_init(&at->iat_net_latency, 0, 0);
1040         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1041                 /* max service estimates are tracked on the server side, so
1042                    don't use the AT history here, just use the last reported
1043                    val. (But keep hist for proc histogram, worst_ever) */
1044                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1045                         AT_FLG_NOHIST);
1046         }
1047 }
1048
1049 struct obd_import *class_new_import(struct obd_device *obd)
1050 {
1051         struct obd_import *imp;
1052         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1053
1054         OBD_ALLOC(imp, sizeof(*imp));
1055         if (imp == NULL)
1056                 return NULL;
1057
1058         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1059         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1060         INIT_LIST_HEAD(&imp->imp_replay_list);
1061         INIT_LIST_HEAD(&imp->imp_sending_list);
1062         INIT_LIST_HEAD(&imp->imp_delayed_list);
1063         INIT_LIST_HEAD(&imp->imp_committed_list);
1064         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1065         imp->imp_known_replied_xid = 0;
1066         imp->imp_replay_cursor = &imp->imp_committed_list;
1067         spin_lock_init(&imp->imp_lock);
1068         imp->imp_last_success_conn = 0;
1069         imp->imp_state = LUSTRE_IMP_NEW;
1070         imp->imp_obd = class_incref(obd, "import", imp);
1071         mutex_init(&imp->imp_sec_mutex);
1072         init_waitqueue_head(&imp->imp_recovery_waitq);
1073
1074         if (curr_pid_ns->child_reaper)
1075                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1076         else
1077                 imp->imp_sec_refpid = 1;
1078
1079         atomic_set(&imp->imp_refcount, 2);
1080         atomic_set(&imp->imp_unregistering, 0);
1081         atomic_set(&imp->imp_inflight, 0);
1082         atomic_set(&imp->imp_replay_inflight, 0);
1083         atomic_set(&imp->imp_inval_count, 0);
1084         INIT_LIST_HEAD(&imp->imp_conn_list);
1085         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1086         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1087         init_imp_at(&imp->imp_at);
1088
1089         /* the default magic is V2, will be used in connect RPC, and
1090          * then adjusted according to the flags in request/reply. */
1091         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1092
1093         return imp;
1094 }
1095 EXPORT_SYMBOL(class_new_import);
1096
1097 void class_destroy_import(struct obd_import *import)
1098 {
1099         LASSERT(import != NULL);
1100         LASSERT(import != LP_POISON);
1101
1102         class_handle_unhash(&import->imp_handle);
1103
1104         spin_lock(&import->imp_lock);
1105         import->imp_generation++;
1106         spin_unlock(&import->imp_lock);
1107         class_import_put(import);
1108 }
1109 EXPORT_SYMBOL(class_destroy_import);
1110
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1112
1113 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1114 {
1115         spin_lock(&exp->exp_locks_list_guard);
1116
1117         LASSERT(lock->l_exp_refs_nr >= 0);
1118
1119         if (lock->l_exp_refs_target != NULL &&
1120             lock->l_exp_refs_target != exp) {
1121                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1122                               exp, lock, lock->l_exp_refs_target);
1123         }
1124         if ((lock->l_exp_refs_nr ++) == 0) {
1125                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1126                 lock->l_exp_refs_target = exp;
1127         }
1128         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1129                lock, exp, lock->l_exp_refs_nr);
1130         spin_unlock(&exp->exp_locks_list_guard);
1131 }
1132 EXPORT_SYMBOL(__class_export_add_lock_ref);
1133
1134 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1135 {
1136         spin_lock(&exp->exp_locks_list_guard);
1137         LASSERT(lock->l_exp_refs_nr > 0);
1138         if (lock->l_exp_refs_target != exp) {
1139                 LCONSOLE_WARN("lock %p, "
1140                               "mismatching export pointers: %p, %p\n",
1141                               lock, lock->l_exp_refs_target, exp);
1142         }
1143         if (-- lock->l_exp_refs_nr == 0) {
1144                 list_del_init(&lock->l_exp_refs_link);
1145                 lock->l_exp_refs_target = NULL;
1146         }
1147         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1148                lock, exp, lock->l_exp_refs_nr);
1149         spin_unlock(&exp->exp_locks_list_guard);
1150 }
1151 EXPORT_SYMBOL(__class_export_del_lock_ref);
1152 #endif
1153
1154 /* A connection defines an export context in which preallocation can
1155    be managed. This releases the export pointer reference, and returns
1156    the export handle, so the export refcount is 1 when this function
1157    returns. */
1158 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1159                   struct obd_uuid *cluuid)
1160 {
1161         struct obd_export *export;
1162         LASSERT(conn != NULL);
1163         LASSERT(obd != NULL);
1164         LASSERT(cluuid != NULL);
1165         ENTRY;
1166
1167         export = class_new_export(obd, cluuid);
1168         if (IS_ERR(export))
1169                 RETURN(PTR_ERR(export));
1170
1171         conn->cookie = export->exp_handle.h_cookie;
1172         class_export_put(export);
1173
1174         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1175                cluuid->uuid, conn->cookie);
1176         RETURN(0);
1177 }
1178 EXPORT_SYMBOL(class_connect);
1179
1180 /* if export is involved in recovery then clean up related things */
1181 static void class_export_recovery_cleanup(struct obd_export *exp)
1182 {
1183         struct obd_device *obd = exp->exp_obd;
1184
1185         spin_lock(&obd->obd_recovery_task_lock);
1186         if (obd->obd_recovering) {
1187                 if (exp->exp_in_recovery) {
1188                         spin_lock(&exp->exp_lock);
1189                         exp->exp_in_recovery = 0;
1190                         spin_unlock(&exp->exp_lock);
1191                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1192                         atomic_dec(&obd->obd_connected_clients);
1193                 }
1194
1195                 /* if called during recovery then should update
1196                  * obd_stale_clients counter,
1197                  * lightweight exports are not counted */
1198                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1199                         exp->exp_obd->obd_stale_clients++;
1200         }
1201         spin_unlock(&obd->obd_recovery_task_lock);
1202
1203         spin_lock(&exp->exp_lock);
1204         /** Cleanup req replay fields */
1205         if (exp->exp_req_replay_needed) {
1206                 exp->exp_req_replay_needed = 0;
1207
1208                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1209                 atomic_dec(&obd->obd_req_replay_clients);
1210         }
1211
1212         /** Cleanup lock replay data */
1213         if (exp->exp_lock_replay_needed) {
1214                 exp->exp_lock_replay_needed = 0;
1215
1216                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1217                 atomic_dec(&obd->obd_lock_replay_clients);
1218         }
1219         spin_unlock(&exp->exp_lock);
1220 }
1221
1222 /* This function removes 1-3 references from the export:
1223  * 1 - for export pointer passed
1224  * and if disconnect really need
1225  * 2 - removing from hash
1226  * 3 - in client_unlink_export
1227  * The export pointer passed to this function can destroyed */
1228 int class_disconnect(struct obd_export *export)
1229 {
1230         int already_disconnected;
1231         ENTRY;
1232
1233         if (export == NULL) {
1234                 CWARN("attempting to free NULL export %p\n", export);
1235                 RETURN(-EINVAL);
1236         }
1237
1238         spin_lock(&export->exp_lock);
1239         already_disconnected = export->exp_disconnected;
1240         export->exp_disconnected = 1;
1241         /*  We hold references of export for uuid hash
1242          *  and nid_hash and export link at least. So
1243          *  it is safe to call cfs_hash_del in there.  */
1244         if (!hlist_unhashed(&export->exp_nid_hash))
1245                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1246                              &export->exp_connection->c_peer.nid,
1247                              &export->exp_nid_hash);
1248         spin_unlock(&export->exp_lock);
1249
1250         /* class_cleanup(), abort_recovery(), and class_fail_export()
1251          * all end up in here, and if any of them race we shouldn't
1252          * call extra class_export_puts(). */
1253         if (already_disconnected) {
1254                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1255                 GOTO(no_disconn, already_disconnected);
1256         }
1257
1258         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1259                export->exp_handle.h_cookie);
1260
1261         class_export_recovery_cleanup(export);
1262         class_unlink_export(export);
1263 no_disconn:
1264         class_export_put(export);
1265         RETURN(0);
1266 }
1267 EXPORT_SYMBOL(class_disconnect);
1268
1269 /* Return non-zero for a fully connected export */
1270 int class_connected_export(struct obd_export *exp)
1271 {
1272         int connected = 0;
1273
1274         if (exp) {
1275                 spin_lock(&exp->exp_lock);
1276                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1277                 spin_unlock(&exp->exp_lock);
1278         }
1279         return connected;
1280 }
1281 EXPORT_SYMBOL(class_connected_export);
1282
1283 static void class_disconnect_export_list(struct list_head *list,
1284                                          enum obd_option flags)
1285 {
1286         int rc;
1287         struct obd_export *exp;
1288         ENTRY;
1289
1290         /* It's possible that an export may disconnect itself, but
1291          * nothing else will be added to this list. */
1292         while (!list_empty(list)) {
1293                 exp = list_entry(list->next, struct obd_export,
1294                                  exp_obd_chain);
1295                 /* need for safe call CDEBUG after obd_disconnect */
1296                 class_export_get(exp);
1297
1298                 spin_lock(&exp->exp_lock);
1299                 exp->exp_flags = flags;
1300                 spin_unlock(&exp->exp_lock);
1301
1302                 if (obd_uuid_equals(&exp->exp_client_uuid,
1303                                     &exp->exp_obd->obd_uuid)) {
1304                         CDEBUG(D_HA,
1305                                "exp %p export uuid == obd uuid, don't discon\n",
1306                                exp);
1307                         /* Need to delete this now so we don't end up pointing
1308                          * to work_list later when this export is cleaned up. */
1309                         list_del_init(&exp->exp_obd_chain);
1310                         class_export_put(exp);
1311                         continue;
1312                 }
1313
1314                 class_export_get(exp);
1315                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1316                        "last request at "CFS_TIME_T"\n",
1317                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1318                        exp, exp->exp_last_request_time);
1319                 /* release one export reference anyway */
1320                 rc = obd_disconnect(exp);
1321
1322                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1323                        obd_export_nid2str(exp), exp, rc);
1324                 class_export_put(exp);
1325         }
1326         EXIT;
1327 }
1328
1329 void class_disconnect_exports(struct obd_device *obd)
1330 {
1331         struct list_head work_list;
1332         ENTRY;
1333
1334         /* Move all of the exports from obd_exports to a work list, en masse. */
1335         INIT_LIST_HEAD(&work_list);
1336         spin_lock(&obd->obd_dev_lock);
1337         list_splice_init(&obd->obd_exports, &work_list);
1338         list_splice_init(&obd->obd_delayed_exports, &work_list);
1339         spin_unlock(&obd->obd_dev_lock);
1340
1341         if (!list_empty(&work_list)) {
1342                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1343                        "disconnecting them\n", obd->obd_minor, obd);
1344                 class_disconnect_export_list(&work_list,
1345                                              exp_flags_from_obd(obd));
1346         } else
1347                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1348                        obd->obd_minor, obd);
1349         EXIT;
1350 }
1351 EXPORT_SYMBOL(class_disconnect_exports);
1352
1353 /* Remove exports that have not completed recovery.
1354  */
1355 void class_disconnect_stale_exports(struct obd_device *obd,
1356                                     int (*test_export)(struct obd_export *))
1357 {
1358         struct list_head work_list;
1359         struct obd_export *exp, *n;
1360         int evicted = 0;
1361         ENTRY;
1362
1363         INIT_LIST_HEAD(&work_list);
1364         spin_lock(&obd->obd_dev_lock);
1365         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1366                                  exp_obd_chain) {
1367                 /* don't count self-export as client */
1368                 if (obd_uuid_equals(&exp->exp_client_uuid,
1369                                     &exp->exp_obd->obd_uuid))
1370                         continue;
1371
1372                 /* don't evict clients which have no slot in last_rcvd
1373                  * (e.g. lightweight connection) */
1374                 if (exp->exp_target_data.ted_lr_idx == -1)
1375                         continue;
1376
1377                 spin_lock(&exp->exp_lock);
1378                 if (exp->exp_failed || test_export(exp)) {
1379                         spin_unlock(&exp->exp_lock);
1380                         continue;
1381                 }
1382                 exp->exp_failed = 1;
1383                 spin_unlock(&exp->exp_lock);
1384
1385                 list_move(&exp->exp_obd_chain, &work_list);
1386                 evicted++;
1387                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1388                        obd->obd_name, exp->exp_client_uuid.uuid,
1389                        exp->exp_connection == NULL ? "<unknown>" :
1390                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1391                 print_export_data(exp, "EVICTING", 0, D_HA);
1392         }
1393         spin_unlock(&obd->obd_dev_lock);
1394
1395         if (evicted)
1396                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1397                               obd->obd_name, evicted);
1398
1399         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1400                                                  OBD_OPT_ABORT_RECOV);
1401         EXIT;
1402 }
1403 EXPORT_SYMBOL(class_disconnect_stale_exports);
1404
1405 void class_fail_export(struct obd_export *exp)
1406 {
1407         int rc, already_failed;
1408
1409         spin_lock(&exp->exp_lock);
1410         already_failed = exp->exp_failed;
1411         exp->exp_failed = 1;
1412         spin_unlock(&exp->exp_lock);
1413
1414         if (already_failed) {
1415                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1416                        exp, exp->exp_client_uuid.uuid);
1417                 return;
1418         }
1419
1420         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1421                exp, exp->exp_client_uuid.uuid);
1422
1423         if (obd_dump_on_timeout)
1424                 libcfs_debug_dumplog();
1425
1426         /* need for safe call CDEBUG after obd_disconnect */
1427         class_export_get(exp);
1428
1429         /* Most callers into obd_disconnect are removing their own reference
1430          * (request, for example) in addition to the one from the hash table.
1431          * We don't have such a reference here, so make one. */
1432         class_export_get(exp);
1433         rc = obd_disconnect(exp);
1434         if (rc)
1435                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1436         else
1437                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1438                        exp, exp->exp_client_uuid.uuid);
1439         class_export_put(exp);
1440 }
1441 EXPORT_SYMBOL(class_fail_export);
1442
1443 char *obd_export_nid2str(struct obd_export *exp)
1444 {
1445         if (exp->exp_connection != NULL)
1446                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1447
1448         return "(no nid)";
1449 }
1450 EXPORT_SYMBOL(obd_export_nid2str);
1451
1452 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1453 {
1454         struct cfs_hash *nid_hash;
1455         struct obd_export *doomed_exp = NULL;
1456         int exports_evicted = 0;
1457
1458         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1459
1460         spin_lock(&obd->obd_dev_lock);
1461         /* umount has run already, so evict thread should leave
1462          * its task to umount thread now */
1463         if (obd->obd_stopping) {
1464                 spin_unlock(&obd->obd_dev_lock);
1465                 return exports_evicted;
1466         }
1467         nid_hash = obd->obd_nid_hash;
1468         cfs_hash_getref(nid_hash);
1469         spin_unlock(&obd->obd_dev_lock);
1470
1471         do {
1472                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1473                 if (doomed_exp == NULL)
1474                         break;
1475
1476                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1477                          "nid %s found, wanted nid %s, requested nid %s\n",
1478                          obd_export_nid2str(doomed_exp),
1479                          libcfs_nid2str(nid_key), nid);
1480                 LASSERTF(doomed_exp != obd->obd_self_export,
1481                          "self-export is hashed by NID?\n");
1482                 exports_evicted++;
1483                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1484                               "request\n", obd->obd_name,
1485                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1486                               obd_export_nid2str(doomed_exp));
1487                 class_fail_export(doomed_exp);
1488                 class_export_put(doomed_exp);
1489         } while (1);
1490
1491         cfs_hash_putref(nid_hash);
1492
1493         if (!exports_evicted)
1494                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1495                        obd->obd_name, nid);
1496         return exports_evicted;
1497 }
1498 EXPORT_SYMBOL(obd_export_evict_by_nid);
1499
1500 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1501 {
1502         struct cfs_hash *uuid_hash;
1503         struct obd_export *doomed_exp = NULL;
1504         struct obd_uuid doomed_uuid;
1505         int exports_evicted = 0;
1506
1507         spin_lock(&obd->obd_dev_lock);
1508         if (obd->obd_stopping) {
1509                 spin_unlock(&obd->obd_dev_lock);
1510                 return exports_evicted;
1511         }
1512         uuid_hash = obd->obd_uuid_hash;
1513         cfs_hash_getref(uuid_hash);
1514         spin_unlock(&obd->obd_dev_lock);
1515
1516         obd_str2uuid(&doomed_uuid, uuid);
1517         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1518                 CERROR("%s: can't evict myself\n", obd->obd_name);
1519                 cfs_hash_putref(uuid_hash);
1520                 return exports_evicted;
1521         }
1522
1523         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1524
1525         if (doomed_exp == NULL) {
1526                 CERROR("%s: can't disconnect %s: no exports found\n",
1527                        obd->obd_name, uuid);
1528         } else {
1529                 CWARN("%s: evicting %s at adminstrative request\n",
1530                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1531                 class_fail_export(doomed_exp);
1532                 class_export_put(doomed_exp);
1533                 exports_evicted++;
1534         }
1535         cfs_hash_putref(uuid_hash);
1536
1537         return exports_evicted;
1538 }
1539
1540 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1541 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1542 EXPORT_SYMBOL(class_export_dump_hook);
1543 #endif
1544
1545 static void print_export_data(struct obd_export *exp, const char *status,
1546                               int locks, int debug_level)
1547 {
1548         struct ptlrpc_reply_state *rs;
1549         struct ptlrpc_reply_state *first_reply = NULL;
1550         int nreplies = 0;
1551
1552         spin_lock(&exp->exp_lock);
1553         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1554                             rs_exp_list) {
1555                 if (nreplies == 0)
1556                         first_reply = rs;
1557                 nreplies++;
1558         }
1559         spin_unlock(&exp->exp_lock);
1560
1561         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1562                "%p %s %llu stale:%d\n",
1563                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1564                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1565                atomic_read(&exp->exp_rpc_count),
1566                atomic_read(&exp->exp_cb_count),
1567                atomic_read(&exp->exp_locks_count),
1568                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1569                nreplies, first_reply, nreplies > 3 ? "..." : "",
1570                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1571 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1572         if (locks && class_export_dump_hook != NULL)
1573                 class_export_dump_hook(exp);
1574 #endif
1575 }
1576
1577 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1578 {
1579         struct obd_export *exp;
1580
1581         spin_lock(&obd->obd_dev_lock);
1582         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1583                 print_export_data(exp, "ACTIVE", locks, debug_level);
1584         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1585                 print_export_data(exp, "UNLINKED", locks, debug_level);
1586         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1587                 print_export_data(exp, "DELAYED", locks, debug_level);
1588         spin_unlock(&obd->obd_dev_lock);
1589         spin_lock(&obd_zombie_impexp_lock);
1590         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1591                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1592         spin_unlock(&obd_zombie_impexp_lock);
1593 }
1594
1595 void obd_exports_barrier(struct obd_device *obd)
1596 {
1597         int waited = 2;
1598         LASSERT(list_empty(&obd->obd_exports));
1599         spin_lock(&obd->obd_dev_lock);
1600         while (!list_empty(&obd->obd_unlinked_exports)) {
1601                 spin_unlock(&obd->obd_dev_lock);
1602                 set_current_state(TASK_UNINTERRUPTIBLE);
1603                 schedule_timeout(cfs_time_seconds(waited));
1604                 if (waited > 5 && is_power_of_2(waited)) {
1605                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1606                                       "more than %d seconds. "
1607                                       "The obd refcount = %d. Is it stuck?\n",
1608                                       obd->obd_name, waited,
1609                                       atomic_read(&obd->obd_refcount));
1610                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1611                 }
1612                 waited *= 2;
1613                 spin_lock(&obd->obd_dev_lock);
1614         }
1615         spin_unlock(&obd->obd_dev_lock);
1616 }
1617 EXPORT_SYMBOL(obd_exports_barrier);
1618
1619 /* Total amount of zombies to be destroyed */
1620 static int zombies_count = 0;
1621
1622 /**
1623  * kill zombie imports and exports
1624  */
1625 void obd_zombie_impexp_cull(void)
1626 {
1627         struct obd_import *import;
1628         struct obd_export *export;
1629         ENTRY;
1630
1631         do {
1632                 spin_lock(&obd_zombie_impexp_lock);
1633
1634                 import = NULL;
1635                 if (!list_empty(&obd_zombie_imports)) {
1636                         import = list_entry(obd_zombie_imports.next,
1637                                             struct obd_import,
1638                                             imp_zombie_chain);
1639                         list_del_init(&import->imp_zombie_chain);
1640                 }
1641
1642                 export = NULL;
1643                 if (!list_empty(&obd_zombie_exports)) {
1644                         export = list_entry(obd_zombie_exports.next,
1645                                             struct obd_export,
1646                                             exp_obd_chain);
1647                         list_del_init(&export->exp_obd_chain);
1648                 }
1649
1650                 spin_unlock(&obd_zombie_impexp_lock);
1651
1652                 if (import != NULL) {
1653                         class_import_destroy(import);
1654                         spin_lock(&obd_zombie_impexp_lock);
1655                         zombies_count--;
1656                         spin_unlock(&obd_zombie_impexp_lock);
1657                 }
1658
1659                 if (export != NULL) {
1660                         class_export_destroy(export);
1661                         spin_lock(&obd_zombie_impexp_lock);
1662                         zombies_count--;
1663                         spin_unlock(&obd_zombie_impexp_lock);
1664                 }
1665
1666                 cond_resched();
1667         } while (import != NULL || export != NULL);
1668         EXIT;
1669 }
1670
1671 static DECLARE_COMPLETION(obd_zombie_start);
1672 static DECLARE_COMPLETION(obd_zombie_stop);
1673 static unsigned long obd_zombie_flags;
1674 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1675 static pid_t obd_zombie_pid;
1676
1677 enum {
1678         OBD_ZOMBIE_STOP         = 0x0001,
1679 };
1680
1681 /**
1682  * check for work for kill zombie import/export thread.
1683  */
1684 static int obd_zombie_impexp_check(void *arg)
1685 {
1686         int rc;
1687
1688         spin_lock(&obd_zombie_impexp_lock);
1689         rc = (zombies_count == 0) &&
1690              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1691         spin_unlock(&obd_zombie_impexp_lock);
1692
1693         RETURN(rc);
1694 }
1695
1696 /**
1697  * Add export to the obd_zombe thread and notify it.
1698  */
1699 static void obd_zombie_export_add(struct obd_export *exp) {
1700         atomic_dec(&obd_stale_export_num);
1701         spin_lock(&exp->exp_obd->obd_dev_lock);
1702         LASSERT(!list_empty(&exp->exp_obd_chain));
1703         list_del_init(&exp->exp_obd_chain);
1704         spin_unlock(&exp->exp_obd->obd_dev_lock);
1705         spin_lock(&obd_zombie_impexp_lock);
1706         zombies_count++;
1707         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1708         spin_unlock(&obd_zombie_impexp_lock);
1709
1710         obd_zombie_impexp_notify();
1711 }
1712
1713 /**
1714  * Add import to the obd_zombe thread and notify it.
1715  */
1716 static void obd_zombie_import_add(struct obd_import *imp) {
1717         LASSERT(imp->imp_sec == NULL);
1718         spin_lock(&obd_zombie_impexp_lock);
1719         LASSERT(list_empty(&imp->imp_zombie_chain));
1720         zombies_count++;
1721         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1722         spin_unlock(&obd_zombie_impexp_lock);
1723
1724         obd_zombie_impexp_notify();
1725 }
1726
1727 /**
1728  * notify import/export destroy thread about new zombie.
1729  */
1730 static void obd_zombie_impexp_notify(void)
1731 {
1732         /*
1733          * Make sure obd_zomebie_impexp_thread get this notification.
1734          * It is possible this signal only get by obd_zombie_barrier, and
1735          * barrier gulps this notification and sleeps away and hangs ensues
1736          */
1737         wake_up_all(&obd_zombie_waitq);
1738 }
1739
1740 /**
1741  * check whether obd_zombie is idle
1742  */
1743 static int obd_zombie_is_idle(void)
1744 {
1745         int rc;
1746
1747         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1748         spin_lock(&obd_zombie_impexp_lock);
1749         rc = (zombies_count == 0);
1750         spin_unlock(&obd_zombie_impexp_lock);
1751         return rc;
1752 }
1753
1754 /**
1755  * wait when obd_zombie import/export queues become empty
1756  */
1757 void obd_zombie_barrier(void)
1758 {
1759         struct l_wait_info lwi = { 0 };
1760
1761         if (obd_zombie_pid == current_pid())
1762                 /* don't wait for myself */
1763                 return;
1764         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1765 }
1766 EXPORT_SYMBOL(obd_zombie_barrier);
1767
1768
1769 struct obd_export *obd_stale_export_get(void)
1770 {
1771         struct obd_export *exp = NULL;
1772         ENTRY;
1773
1774         spin_lock(&obd_stale_export_lock);
1775         if (!list_empty(&obd_stale_exports)) {
1776                 exp = list_entry(obd_stale_exports.next,
1777                                  struct obd_export, exp_stale_list);
1778                 list_del_init(&exp->exp_stale_list);
1779         }
1780         spin_unlock(&obd_stale_export_lock);
1781
1782         if (exp) {
1783                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1784                        atomic_read(&obd_stale_export_num));
1785         }
1786         RETURN(exp);
1787 }
1788 EXPORT_SYMBOL(obd_stale_export_get);
1789
1790 void obd_stale_export_put(struct obd_export *exp)
1791 {
1792         ENTRY;
1793
1794         LASSERT(list_empty(&exp->exp_stale_list));
1795         if (exp->exp_lock_hash &&
1796             atomic_read(&exp->exp_lock_hash->hs_count)) {
1797                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1798                        atomic_read(&obd_stale_export_num));
1799
1800                 spin_lock_bh(&exp->exp_bl_list_lock);
1801                 spin_lock(&obd_stale_export_lock);
1802                 /* Add to the tail if there is no blocked locks,
1803                  * to the head otherwise. */
1804                 if (list_empty(&exp->exp_bl_list))
1805                         list_add_tail(&exp->exp_stale_list,
1806                                       &obd_stale_exports);
1807                 else
1808                         list_add(&exp->exp_stale_list,
1809                                  &obd_stale_exports);
1810
1811                 spin_unlock(&obd_stale_export_lock);
1812                 spin_unlock_bh(&exp->exp_bl_list_lock);
1813         } else {
1814                 class_export_put(exp);
1815         }
1816         EXIT;
1817 }
1818 EXPORT_SYMBOL(obd_stale_export_put);
1819
1820 /**
1821  * Adjust the position of the export in the stale list,
1822  * i.e. move to the head of the list if is needed.
1823  **/
1824 void obd_stale_export_adjust(struct obd_export *exp)
1825 {
1826         LASSERT(exp != NULL);
1827         spin_lock_bh(&exp->exp_bl_list_lock);
1828         spin_lock(&obd_stale_export_lock);
1829
1830         if (!list_empty(&exp->exp_stale_list) &&
1831             !list_empty(&exp->exp_bl_list))
1832                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1833
1834         spin_unlock(&obd_stale_export_lock);
1835         spin_unlock_bh(&exp->exp_bl_list_lock);
1836 }
1837 EXPORT_SYMBOL(obd_stale_export_adjust);
1838
1839 /**
1840  * destroy zombie export/import thread.
1841  */
1842 static int obd_zombie_impexp_thread(void *unused)
1843 {
1844         unshare_fs_struct();
1845         complete(&obd_zombie_start);
1846
1847         obd_zombie_pid = current_pid();
1848
1849         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1850                 struct l_wait_info lwi = { 0 };
1851
1852                 l_wait_event(obd_zombie_waitq,
1853                              !obd_zombie_impexp_check(NULL), &lwi);
1854                 obd_zombie_impexp_cull();
1855
1856                 /*
1857                  * Notify obd_zombie_barrier callers that queues
1858                  * may be empty.
1859                  */
1860                 wake_up(&obd_zombie_waitq);
1861         }
1862
1863         complete(&obd_zombie_stop);
1864
1865         RETURN(0);
1866 }
1867
1868
1869 /**
1870  * start destroy zombie import/export thread
1871  */
1872 int obd_zombie_impexp_init(void)
1873 {
1874         struct task_struct *task;
1875
1876         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1877         if (IS_ERR(task))
1878                 RETURN(PTR_ERR(task));
1879
1880         wait_for_completion(&obd_zombie_start);
1881         RETURN(0);
1882 }
1883 /**
1884  * stop destroy zombie import/export thread
1885  */
1886 void obd_zombie_impexp_stop(void)
1887 {
1888         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1889         obd_zombie_impexp_notify();
1890         wait_for_completion(&obd_zombie_stop);
1891         LASSERT(list_empty(&obd_stale_exports));
1892 }
1893
1894 /***** Kernel-userspace comm helpers *******/
1895
1896 /* Get length of entire message, including header */
1897 int kuc_len(int payload_len)
1898 {
1899         return sizeof(struct kuc_hdr) + payload_len;
1900 }
1901 EXPORT_SYMBOL(kuc_len);
1902
1903 /* Get a pointer to kuc header, given a ptr to the payload
1904  * @param p Pointer to payload area
1905  * @returns Pointer to kuc header
1906  */
1907 struct kuc_hdr * kuc_ptr(void *p)
1908 {
1909         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1910         LASSERT(lh->kuc_magic == KUC_MAGIC);
1911         return lh;
1912 }
1913 EXPORT_SYMBOL(kuc_ptr);
1914
1915 /* Alloc space for a message, and fill in header
1916  * @return Pointer to payload area
1917  */
1918 void *kuc_alloc(int payload_len, int transport, int type)
1919 {
1920         struct kuc_hdr *lh;
1921         int len = kuc_len(payload_len);
1922
1923         OBD_ALLOC(lh, len);
1924         if (lh == NULL)
1925                 return ERR_PTR(-ENOMEM);
1926
1927         lh->kuc_magic = KUC_MAGIC;
1928         lh->kuc_transport = transport;
1929         lh->kuc_msgtype = type;
1930         lh->kuc_msglen = len;
1931
1932         return (void *)(lh + 1);
1933 }
1934 EXPORT_SYMBOL(kuc_alloc);
1935
1936 /* Takes pointer to payload area */
1937 void kuc_free(void *p, int payload_len)
1938 {
1939         struct kuc_hdr *lh = kuc_ptr(p);
1940         OBD_FREE(lh, kuc_len(payload_len));
1941 }
1942 EXPORT_SYMBOL(kuc_free);
1943
1944 struct obd_request_slot_waiter {
1945         struct list_head        orsw_entry;
1946         wait_queue_head_t       orsw_waitq;
1947         bool                    orsw_signaled;
1948 };
1949
1950 static bool obd_request_slot_avail(struct client_obd *cli,
1951                                    struct obd_request_slot_waiter *orsw)
1952 {
1953         bool avail;
1954
1955         spin_lock(&cli->cl_loi_list_lock);
1956         avail = !!list_empty(&orsw->orsw_entry);
1957         spin_unlock(&cli->cl_loi_list_lock);
1958
1959         return avail;
1960 };
1961
1962 /*
1963  * For network flow control, the RPC sponsor needs to acquire a credit
1964  * before sending the RPC. The credits count for a connection is defined
1965  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1966  * the subsequent RPC sponsors need to wait until others released their
1967  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1968  */
1969 int obd_get_request_slot(struct client_obd *cli)
1970 {
1971         struct obd_request_slot_waiter   orsw;
1972         struct l_wait_info               lwi;
1973         int                              rc;
1974
1975         spin_lock(&cli->cl_loi_list_lock);
1976         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1977                 cli->cl_r_in_flight++;
1978                 spin_unlock(&cli->cl_loi_list_lock);
1979                 return 0;
1980         }
1981
1982         init_waitqueue_head(&orsw.orsw_waitq);
1983         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1984         orsw.orsw_signaled = false;
1985         spin_unlock(&cli->cl_loi_list_lock);
1986
1987         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1988         rc = l_wait_event(orsw.orsw_waitq,
1989                           obd_request_slot_avail(cli, &orsw) ||
1990                           orsw.orsw_signaled,
1991                           &lwi);
1992
1993         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1994          * freed but other (such as obd_put_request_slot) is using it. */
1995         spin_lock(&cli->cl_loi_list_lock);
1996         if (rc != 0) {
1997                 if (!orsw.orsw_signaled) {
1998                         if (list_empty(&orsw.orsw_entry))
1999                                 cli->cl_r_in_flight--;
2000                         else
2001                                 list_del(&orsw.orsw_entry);
2002                 }
2003         }
2004
2005         if (orsw.orsw_signaled) {
2006                 LASSERT(list_empty(&orsw.orsw_entry));
2007
2008                 rc = -EINTR;
2009         }
2010         spin_unlock(&cli->cl_loi_list_lock);
2011
2012         return rc;
2013 }
2014 EXPORT_SYMBOL(obd_get_request_slot);
2015
2016 void obd_put_request_slot(struct client_obd *cli)
2017 {
2018         struct obd_request_slot_waiter *orsw;
2019
2020         spin_lock(&cli->cl_loi_list_lock);
2021         cli->cl_r_in_flight--;
2022
2023         /* If there is free slot, wakeup the first waiter. */
2024         if (!list_empty(&cli->cl_loi_read_list) &&
2025             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2026                 orsw = list_entry(cli->cl_loi_read_list.next,
2027                                   struct obd_request_slot_waiter, orsw_entry);
2028                 list_del_init(&orsw->orsw_entry);
2029                 cli->cl_r_in_flight++;
2030                 wake_up(&orsw->orsw_waitq);
2031         }
2032         spin_unlock(&cli->cl_loi_list_lock);
2033 }
2034 EXPORT_SYMBOL(obd_put_request_slot);
2035
2036 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2037 {
2038         return cli->cl_max_rpcs_in_flight;
2039 }
2040 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2041
2042 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2043 {
2044         struct obd_request_slot_waiter *orsw;
2045         __u32                           old;
2046         int                             diff;
2047         int                             i;
2048         char                            *typ_name;
2049         int                             rc;
2050
2051         if (max > OBD_MAX_RIF_MAX || max < 1)
2052                 return -ERANGE;
2053
2054         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2055         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2056                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2057                  * strictly lower that max_rpcs_in_flight */
2058                 if (max < 2) {
2059                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2060                                "because it must be higher than "
2061                                "max_mod_rpcs_in_flight value",
2062                                cli->cl_import->imp_obd->obd_name);
2063                         return -ERANGE;
2064                 }
2065                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2066                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2067                         if (rc != 0)
2068                                 return rc;
2069                 }
2070         }
2071
2072         spin_lock(&cli->cl_loi_list_lock);
2073         old = cli->cl_max_rpcs_in_flight;
2074         cli->cl_max_rpcs_in_flight = max;
2075         diff = max - old;
2076
2077         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2078         for (i = 0; i < diff; i++) {
2079                 if (list_empty(&cli->cl_loi_read_list))
2080                         break;
2081
2082                 orsw = list_entry(cli->cl_loi_read_list.next,
2083                                   struct obd_request_slot_waiter, orsw_entry);
2084                 list_del_init(&orsw->orsw_entry);
2085                 cli->cl_r_in_flight++;
2086                 wake_up(&orsw->orsw_waitq);
2087         }
2088         spin_unlock(&cli->cl_loi_list_lock);
2089
2090         return 0;
2091 }
2092 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2093
2094 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2095 {
2096         return cli->cl_max_mod_rpcs_in_flight;
2097 }
2098 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2099
2100 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2101 {
2102         struct obd_connect_data *ocd;
2103         __u16 maxmodrpcs;
2104         __u16 prev;
2105
2106         if (max > OBD_MAX_RIF_MAX || max < 1)
2107                 return -ERANGE;
2108
2109         /* cannot exceed or equal max_rpcs_in_flight */
2110         if (max >= cli->cl_max_rpcs_in_flight) {
2111                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2112                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2113                        cli->cl_import->imp_obd->obd_name,
2114                        max, cli->cl_max_rpcs_in_flight);
2115                 return -ERANGE;
2116         }
2117
2118         /* cannot exceed max modify RPCs in flight supported by the server */
2119         ocd = &cli->cl_import->imp_connect_data;
2120         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2121                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2122         else
2123                 maxmodrpcs = 1;
2124         if (max > maxmodrpcs) {
2125                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2126                        "higher than max_mod_rpcs_per_client value (%hu) "
2127                        "returned by the server at connection\n",
2128                        cli->cl_import->imp_obd->obd_name,
2129                        max, maxmodrpcs);
2130                 return -ERANGE;
2131         }
2132
2133         spin_lock(&cli->cl_mod_rpcs_lock);
2134
2135         prev = cli->cl_max_mod_rpcs_in_flight;
2136         cli->cl_max_mod_rpcs_in_flight = max;
2137
2138         /* wakeup waiters if limit has been increased */
2139         if (cli->cl_max_mod_rpcs_in_flight > prev)
2140                 wake_up(&cli->cl_mod_rpcs_waitq);
2141
2142         spin_unlock(&cli->cl_mod_rpcs_lock);
2143
2144         return 0;
2145 }
2146 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2147
2148
2149 #define pct(a, b) (b ? a * 100 / b : 0)
2150 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2151                                struct seq_file *seq)
2152 {
2153         unsigned long mod_tot = 0, mod_cum;
2154         struct timespec64 now;
2155         int i;
2156
2157         ktime_get_real_ts64(&now);
2158
2159         spin_lock(&cli->cl_mod_rpcs_lock);
2160
2161         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2162                    (s64)now.tv_sec, now.tv_nsec);
2163         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2164                    cli->cl_mod_rpcs_in_flight);
2165
2166         seq_printf(seq, "\n\t\t\tmodify\n");
2167         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2168
2169         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2170
2171         mod_cum = 0;
2172         for (i = 0; i < OBD_HIST_MAX; i++) {
2173                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2174                 mod_cum += mod;
2175                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2176                            i, mod, pct(mod, mod_tot),
2177                            pct(mod_cum, mod_tot));
2178                 if (mod_cum == mod_tot)
2179                         break;
2180         }
2181
2182         spin_unlock(&cli->cl_mod_rpcs_lock);
2183
2184         return 0;
2185 }
2186 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2187 #undef pct
2188
2189
2190 /* The number of modify RPCs sent in parallel is limited
2191  * because the server has a finite number of slots per client to
2192  * store request result and ensure reply reconstruction when needed.
2193  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2194  * that takes into account server limit and cl_max_rpcs_in_flight
2195  * value.
2196  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2197  * one close request is allowed above the maximum.
2198  */
2199 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2200                                                  bool close_req)
2201 {
2202         bool avail;
2203
2204         /* A slot is available if
2205          * - number of modify RPCs in flight is less than the max
2206          * - it's a close RPC and no other close request is in flight
2207          */
2208         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2209                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2210
2211         return avail;
2212 }
2213
2214 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2215                                          bool close_req)
2216 {
2217         bool avail;
2218
2219         spin_lock(&cli->cl_mod_rpcs_lock);
2220         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2221         spin_unlock(&cli->cl_mod_rpcs_lock);
2222         return avail;
2223 }
2224
2225 /* Get a modify RPC slot from the obd client @cli according
2226  * to the kind of operation @opc that is going to be sent
2227  * and the intent @it of the operation if it applies.
2228  * If the maximum number of modify RPCs in flight is reached
2229  * the thread is put to sleep.
2230  * Returns the tag to be set in the request message. Tag 0
2231  * is reserved for non-modifying requests.
2232  */
2233 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2234                            struct lookup_intent *it)
2235 {
2236         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2237         bool                    close_req = false;
2238         __u16                   i, max;
2239
2240         /* read-only metadata RPCs don't consume a slot on MDT
2241          * for reply reconstruction
2242          */
2243         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2244                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2245                 return 0;
2246
2247         if (opc == MDS_CLOSE)
2248                 close_req = true;
2249
2250         do {
2251                 spin_lock(&cli->cl_mod_rpcs_lock);
2252                 max = cli->cl_max_mod_rpcs_in_flight;
2253                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2254                         /* there is a slot available */
2255                         cli->cl_mod_rpcs_in_flight++;
2256                         if (close_req)
2257                                 cli->cl_close_rpcs_in_flight++;
2258                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2259                                          cli->cl_mod_rpcs_in_flight);
2260                         /* find a free tag */
2261                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2262                                                 max + 1);
2263                         LASSERT(i < OBD_MAX_RIF_MAX);
2264                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2265                         spin_unlock(&cli->cl_mod_rpcs_lock);
2266                         /* tag 0 is reserved for non-modify RPCs */
2267                         return i + 1;
2268                 }
2269                 spin_unlock(&cli->cl_mod_rpcs_lock);
2270
2271                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2272                        "opc %u, max %hu\n",
2273                        cli->cl_import->imp_obd->obd_name, opc, max);
2274
2275                 l_wait_event(cli->cl_mod_rpcs_waitq,
2276                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2277         } while (true);
2278 }
2279 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2280
2281 /* Put a modify RPC slot from the obd client @cli according
2282  * to the kind of operation @opc that has been sent and the
2283  * intent @it of the operation if it applies.
2284  */
2285 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2286                           struct lookup_intent *it, __u16 tag)
2287 {
2288         bool                    close_req = false;
2289
2290         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2291                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2292                 return;
2293
2294         if (opc == MDS_CLOSE)
2295                 close_req = true;
2296
2297         spin_lock(&cli->cl_mod_rpcs_lock);
2298         cli->cl_mod_rpcs_in_flight--;
2299         if (close_req)
2300                 cli->cl_close_rpcs_in_flight--;
2301         /* release the tag in the bitmap */
2302         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2303         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2304         spin_unlock(&cli->cl_mod_rpcs_lock);
2305         wake_up(&cli->cl_mod_rpcs_waitq);
2306 }
2307 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2308