Whamcloud - gitweb
LU-13356 client: don't use OBD_CONNECT_MNE_SWAB
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         type->typ_debugfs_entry = symlink;
208         type->typ_sym_filter = true;
209
210         if (enable_proc) {
211                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
212                                                       NULL, NULL);
213                 if (IS_ERR(type->typ_procroot)) {
214                         CERROR("%s: can't create compat proc entry: %d\n",
215                                name, (int)PTR_ERR(type->typ_procroot));
216                         type->typ_procroot = NULL;
217                 }
218         }
219
220         return type;
221 }
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
224
225 #define CLASS_MAX_NAME 1024
226
227 int class_register_type(const struct obd_ops *dt_ops,
228                         const struct md_ops *md_ops,
229                         bool enable_proc, struct lprocfs_vars *vars,
230                         const char *name, struct lu_device_type *ldt)
231 {
232         struct obd_type *type;
233         int rc;
234
235         ENTRY;
236         /* sanity check */
237         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
238
239         type = class_search_type(name);
240         if (type) {
241 #ifdef HAVE_SERVER_SUPPORT
242                 if (type->typ_sym_filter)
243                         goto dir_exist;
244 #endif /* HAVE_SERVER_SUPPORT */
245                 kobject_put(&type->typ_kobj);
246                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247                 RETURN(-EEXIST);
248         }
249
250         OBD_ALLOC(type, sizeof(*type));
251         if (type == NULL)
252                 RETURN(-ENOMEM);
253
254         type->typ_kobj.kset = lustre_kset;
255         kobject_init(&type->typ_kobj, &class_ktype);
256 #ifdef HAVE_SERVER_SUPPORT
257 dir_exist:
258 #endif /* HAVE_SERVER_SUPPORT */
259
260         type->typ_dt_ops = dt_ops;
261         type->typ_md_ops = md_ops;
262
263 #ifdef HAVE_SERVER_SUPPORT
264         if (type->typ_sym_filter) {
265                 type->typ_sym_filter = false;
266                 kobject_put(&type->typ_kobj);
267                 goto setup_ldt;
268         }
269 #endif
270 #ifdef CONFIG_PROC_FS
271         if (enable_proc && !type->typ_procroot) {
272                 type->typ_procroot = lprocfs_register(name,
273                                                       proc_lustre_root,
274                                                       NULL, type);
275                 if (IS_ERR(type->typ_procroot)) {
276                         rc = PTR_ERR(type->typ_procroot);
277                         type->typ_procroot = NULL;
278                         GOTO(failed, rc);
279                 }
280         }
281 #endif
282         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
283         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
284
285         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
286         if (rc)
287                 GOTO(failed, rc);
288 #ifdef HAVE_SERVER_SUPPORT
289 setup_ldt:
290 #endif
291         if (ldt) {
292                 type->typ_lu = ldt;
293                 rc = lu_device_type_init(ldt);
294                 if (rc)
295                         GOTO(failed, rc);
296         }
297
298         RETURN(0);
299
300 failed:
301         kobject_put(&type->typ_kobj);
302
303         RETURN(rc);
304 }
305 EXPORT_SYMBOL(class_register_type);
306
307 int class_unregister_type(const char *name)
308 {
309         struct obd_type *type = class_search_type(name);
310         int rc = 0;
311         ENTRY;
312
313         if (!type) {
314                 CERROR("unknown obd type\n");
315                 RETURN(-EINVAL);
316         }
317
318         if (atomic_read(&type->typ_refcnt)) {
319                 CERROR("type %s has refcount (%d)\n", name,
320                        atomic_read(&type->typ_refcnt));
321                 /* This is a bad situation, let's make the best of it */
322                 /* Remove ops, but leave the name for debugging */
323                 type->typ_dt_ops = NULL;
324                 type->typ_md_ops = NULL;
325                 GOTO(out_put, rc = -EBUSY);
326         }
327
328         /* Put the final ref */
329         kobject_put(&type->typ_kobj);
330 out_put:
331         /* Put the ref returned by class_search_type() */
332         kobject_put(&type->typ_kobj);
333
334         RETURN(rc);
335 } /* class_unregister_type */
336 EXPORT_SYMBOL(class_unregister_type);
337
338 /**
339  * Create a new obd device.
340  *
341  * Allocate the new obd_device and initialize it.
342  *
343  * \param[in] type_name obd device type string.
344  * \param[in] name      obd device name.
345  * \param[in] uuid      obd device UUID
346  *
347  * \retval newdev         pointer to created obd_device
348  * \retval ERR_PTR(errno) on error
349  */
350 struct obd_device *class_newdev(const char *type_name, const char *name,
351                                 const char *uuid)
352 {
353         struct obd_device *newdev;
354         struct obd_type *type = NULL;
355         ENTRY;
356
357         if (strlen(name) >= MAX_OBD_NAME) {
358                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
359                 RETURN(ERR_PTR(-EINVAL));
360         }
361
362         type = class_get_type(type_name);
363         if (type == NULL){
364                 CERROR("OBD: unknown type: %s\n", type_name);
365                 RETURN(ERR_PTR(-ENODEV));
366         }
367
368         newdev = obd_device_alloc();
369         if (newdev == NULL) {
370                 class_put_type(type);
371                 RETURN(ERR_PTR(-ENOMEM));
372         }
373         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
374         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
375         newdev->obd_type = type;
376         newdev->obd_minor = -1;
377
378         rwlock_init(&newdev->obd_pool_lock);
379         newdev->obd_pool_limit = 0;
380         newdev->obd_pool_slv = 0;
381
382         INIT_LIST_HEAD(&newdev->obd_exports);
383         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385         INIT_LIST_HEAD(&newdev->obd_exports_timed);
386         INIT_LIST_HEAD(&newdev->obd_nid_stats);
387         spin_lock_init(&newdev->obd_nid_lock);
388         spin_lock_init(&newdev->obd_dev_lock);
389         mutex_init(&newdev->obd_dev_mutex);
390         spin_lock_init(&newdev->obd_osfs_lock);
391         /* newdev->obd_osfs_age must be set to a value in the distant
392          * past to guarantee a fresh statfs is fetched on mount. */
393         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
394
395         /* XXX belongs in setup not attach  */
396         init_rwsem(&newdev->obd_observer_link_sem);
397         /* recovery data */
398         spin_lock_init(&newdev->obd_recovery_task_lock);
399         init_waitqueue_head(&newdev->obd_next_transno_waitq);
400         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
401         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404         INIT_LIST_HEAD(&newdev->obd_evict_list);
405         INIT_LIST_HEAD(&newdev->obd_lwp_list);
406
407         llog_group_init(&newdev->obd_olg);
408         /* Detach drops this */
409         atomic_set(&newdev->obd_refcount, 1);
410         lu_ref_init(&newdev->obd_reference);
411         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
412
413         newdev->obd_conn_inprogress = 0;
414
415         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
416
417         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418                newdev->obd_name, newdev);
419
420         return newdev;
421 }
422
423 /**
424  * Free obd device.
425  *
426  * \param[in] obd obd_device to be freed
427  *
428  * \retval none
429  */
430 void class_free_dev(struct obd_device *obd)
431 {
432         struct obd_type *obd_type = obd->obd_type;
433
434         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
435                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
436         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
437                  "obd %p != obd_devs[%d] %p\n",
438                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
439         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
440                  "obd_refcount should be 0, not %d\n",
441                  atomic_read(&obd->obd_refcount));
442         LASSERT(obd_type != NULL);
443
444         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
445                obd->obd_name, obd->obd_type->typ_name);
446
447         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
448                          obd->obd_name, obd->obd_uuid.uuid);
449         if (obd->obd_stopping) {
450                 int err;
451
452                 /* If we're not stopping, we were never set up */
453                 err = obd_cleanup(obd);
454                 if (err)
455                         CERROR("Cleanup %s returned %d\n",
456                                 obd->obd_name, err);
457         }
458
459         obd_device_free(obd);
460
461         class_put_type(obd_type);
462 }
463
464 /**
465  * Unregister obd device.
466  *
467  * Free slot in obd_dev[] used by \a obd.
468  *
469  * \param[in] new_obd obd_device to be unregistered
470  *
471  * \retval none
472  */
473 void class_unregister_device(struct obd_device *obd)
474 {
475         write_lock(&obd_dev_lock);
476         if (obd->obd_minor >= 0) {
477                 LASSERT(obd_devs[obd->obd_minor] == obd);
478                 obd_devs[obd->obd_minor] = NULL;
479                 obd->obd_minor = -1;
480         }
481         write_unlock(&obd_dev_lock);
482 }
483
484 /**
485  * Register obd device.
486  *
487  * Find free slot in obd_devs[], fills it with \a new_obd.
488  *
489  * \param[in] new_obd obd_device to be registered
490  *
491  * \retval 0          success
492  * \retval -EEXIST    device with this name is registered
493  * \retval -EOVERFLOW obd_devs[] is full
494  */
495 int class_register_device(struct obd_device *new_obd)
496 {
497         int ret = 0;
498         int i;
499         int new_obd_minor = 0;
500         bool minor_assign = false;
501         bool retried = false;
502
503 again:
504         write_lock(&obd_dev_lock);
505         for (i = 0; i < class_devno_max(); i++) {
506                 struct obd_device *obd = class_num2obd(i);
507
508                 if (obd != NULL &&
509                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
510
511                         if (!retried) {
512                                 write_unlock(&obd_dev_lock);
513
514                                 /* the obd_device could be waited to be
515                                  * destroyed by the "obd_zombie_impexp_thread".
516                                  */
517                                 obd_zombie_barrier();
518                                 retried = true;
519                                 goto again;
520                         }
521
522                         CERROR("%s: already exists, won't add\n",
523                                obd->obd_name);
524                         /* in case we found a free slot before duplicate */
525                         minor_assign = false;
526                         ret = -EEXIST;
527                         break;
528                 }
529                 if (!minor_assign && obd == NULL) {
530                         new_obd_minor = i;
531                         minor_assign = true;
532                 }
533         }
534
535         if (minor_assign) {
536                 new_obd->obd_minor = new_obd_minor;
537                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
538                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
539                 obd_devs[new_obd_minor] = new_obd;
540         } else {
541                 if (ret == 0) {
542                         ret = -EOVERFLOW;
543                         CERROR("%s: all %u/%u devices used, increase "
544                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
545                                i, class_devno_max(), ret);
546                 }
547         }
548         write_unlock(&obd_dev_lock);
549
550         RETURN(ret);
551 }
552
553 static int class_name2dev_nolock(const char *name)
554 {
555         int i;
556
557         if (!name)
558                 return -1;
559
560         for (i = 0; i < class_devno_max(); i++) {
561                 struct obd_device *obd = class_num2obd(i);
562
563                 if (obd && strcmp(name, obd->obd_name) == 0) {
564                         /* Make sure we finished attaching before we give
565                            out any references */
566                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
567                         if (obd->obd_attached) {
568                                 return i;
569                         }
570                         break;
571                 }
572         }
573
574         return -1;
575 }
576
577 int class_name2dev(const char *name)
578 {
579         int i;
580
581         if (!name)
582                 return -1;
583
584         read_lock(&obd_dev_lock);
585         i = class_name2dev_nolock(name);
586         read_unlock(&obd_dev_lock);
587
588         return i;
589 }
590 EXPORT_SYMBOL(class_name2dev);
591
592 struct obd_device *class_name2obd(const char *name)
593 {
594         int dev = class_name2dev(name);
595
596         if (dev < 0 || dev > class_devno_max())
597                 return NULL;
598         return class_num2obd(dev);
599 }
600 EXPORT_SYMBOL(class_name2obd);
601
602 int class_uuid2dev_nolock(struct obd_uuid *uuid)
603 {
604         int i;
605
606         for (i = 0; i < class_devno_max(); i++) {
607                 struct obd_device *obd = class_num2obd(i);
608
609                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
610                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
611                         return i;
612                 }
613         }
614
615         return -1;
616 }
617
618 int class_uuid2dev(struct obd_uuid *uuid)
619 {
620         int i;
621
622         read_lock(&obd_dev_lock);
623         i = class_uuid2dev_nolock(uuid);
624         read_unlock(&obd_dev_lock);
625
626         return i;
627 }
628 EXPORT_SYMBOL(class_uuid2dev);
629
630 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
631 {
632         int dev = class_uuid2dev(uuid);
633         if (dev < 0)
634                 return NULL;
635         return class_num2obd(dev);
636 }
637 EXPORT_SYMBOL(class_uuid2obd);
638
639 /**
640  * Get obd device from ::obd_devs[]
641  *
642  * \param num [in] array index
643  *
644  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
645  *         otherwise return the obd device there.
646  */
647 struct obd_device *class_num2obd(int num)
648 {
649         struct obd_device *obd = NULL;
650
651         if (num < class_devno_max()) {
652                 obd = obd_devs[num];
653                 if (obd == NULL)
654                         return NULL;
655
656                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
657                          "%p obd_magic %08x != %08x\n",
658                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
659                 LASSERTF(obd->obd_minor == num,
660                          "%p obd_minor %0d != %0d\n",
661                          obd, obd->obd_minor, num);
662         }
663
664         return obd;
665 }
666
667 /**
668  * Find obd in obd_dev[] by name or uuid.
669  *
670  * Increment obd's refcount if found.
671  *
672  * \param[in] str obd name or uuid
673  *
674  * \retval NULL    if not found
675  * \retval target  pointer to found obd_device
676  */
677 struct obd_device *class_dev_by_str(const char *str)
678 {
679         struct obd_device *target = NULL;
680         struct obd_uuid tgtuuid;
681         int rc;
682
683         obd_str2uuid(&tgtuuid, str);
684
685         read_lock(&obd_dev_lock);
686         rc = class_uuid2dev_nolock(&tgtuuid);
687         if (rc < 0)
688                 rc = class_name2dev_nolock(str);
689
690         if (rc >= 0)
691                 target = class_num2obd(rc);
692
693         if (target != NULL)
694                 class_incref(target, "find", current);
695         read_unlock(&obd_dev_lock);
696
697         RETURN(target);
698 }
699 EXPORT_SYMBOL(class_dev_by_str);
700
701 /**
702  * Get obd devices count. Device in any
703  *    state are counted
704  * \retval obd device count
705  */
706 int get_devices_count(void)
707 {
708         int index, max_index = class_devno_max(), dev_count = 0;
709
710         read_lock(&obd_dev_lock);
711         for (index = 0; index <= max_index; index++) {
712                 struct obd_device *obd = class_num2obd(index);
713                 if (obd != NULL)
714                         dev_count++;
715         }
716         read_unlock(&obd_dev_lock);
717
718         return dev_count;
719 }
720 EXPORT_SYMBOL(get_devices_count);
721
722 void class_obd_list(void)
723 {
724         char *status;
725         int i;
726
727         read_lock(&obd_dev_lock);
728         for (i = 0; i < class_devno_max(); i++) {
729                 struct obd_device *obd = class_num2obd(i);
730
731                 if (obd == NULL)
732                         continue;
733                 if (obd->obd_stopping)
734                         status = "ST";
735                 else if (obd->obd_set_up)
736                         status = "UP";
737                 else if (obd->obd_attached)
738                         status = "AT";
739                 else
740                         status = "--";
741                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742                          i, status, obd->obd_type->typ_name,
743                          obd->obd_name, obd->obd_uuid.uuid,
744                          atomic_read(&obd->obd_refcount));
745         }
746         read_unlock(&obd_dev_lock);
747 }
748
749 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
750  * specified, then only the client with that uuid is returned,
751  * otherwise any client connected to the tgt is returned.
752  */
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754                                          const char *type_name,
755                                          struct obd_uuid *grp_uuid)
756 {
757         int i;
758
759         read_lock(&obd_dev_lock);
760         for (i = 0; i < class_devno_max(); i++) {
761                 struct obd_device *obd = class_num2obd(i);
762
763                 if (obd == NULL)
764                         continue;
765                 if ((strncmp(obd->obd_type->typ_name, type_name,
766                              strlen(type_name)) == 0)) {
767                         if (obd_uuid_equals(tgt_uuid,
768                                             &obd->u.cli.cl_target_uuid) &&
769                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
770                                                          &obd->obd_uuid) : 1)) {
771                                 read_unlock(&obd_dev_lock);
772                                 return obd;
773                         }
774                 }
775         }
776         read_unlock(&obd_dev_lock);
777
778         return NULL;
779 }
780 EXPORT_SYMBOL(class_find_client_obd);
781
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783  * searching at *next, and if a device is found, the next index to look
784  * at is saved in *next. If next is NULL, then the first matching device
785  * will always be returned.
786  */
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
788 {
789         int i;
790
791         if (next == NULL)
792                 i = 0;
793         else if (*next >= 0 && *next < class_devno_max())
794                 i = *next;
795         else
796                 return NULL;
797
798         read_lock(&obd_dev_lock);
799         for (; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805                         if (next != NULL)
806                                 *next = i+1;
807                         read_unlock(&obd_dev_lock);
808                         return obd;
809                 }
810         }
811         read_unlock(&obd_dev_lock);
812
813         return NULL;
814 }
815 EXPORT_SYMBOL(class_devices_in_group);
816
817 /**
818  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819  * adjust sptlrpc settings accordingly.
820  */
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
822 {
823         struct obd_device  *obd;
824         const char         *type;
825         int                 i, rc = 0, rc2;
826
827         LASSERT(namelen > 0);
828
829         read_lock(&obd_dev_lock);
830         for (i = 0; i < class_devno_max(); i++) {
831                 obd = class_num2obd(i);
832
833                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834                         continue;
835
836                 /* only notify mdc, osc, osp, lwp, mdt, ost
837                  * because only these have a -sptlrpc llog */
838                 type = obd->obd_type->typ_name;
839                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OST_NAME) != 0)
845                         continue;
846
847                 if (strncmp(obd->obd_name, fsname, namelen))
848                         continue;
849
850                 class_incref(obd, __FUNCTION__, obd);
851                 read_unlock(&obd_dev_lock);
852                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853                                          sizeof(KEY_SPTLRPC_CONF),
854                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
855                 rc = rc ? rc : rc2;
856                 class_decref(obd, __FUNCTION__, obd);
857                 read_lock(&obd_dev_lock);
858         }
859         read_unlock(&obd_dev_lock);
860         return rc;
861 }
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
863
864 void obd_cleanup_caches(void)
865 {
866         ENTRY;
867         if (obd_device_cachep) {
868                 kmem_cache_destroy(obd_device_cachep);
869                 obd_device_cachep = NULL;
870         }
871
872         EXIT;
873 }
874
875 int obd_init_caches(void)
876 {
877         int rc;
878         ENTRY;
879
880         LASSERT(obd_device_cachep == NULL);
881         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882                                 sizeof(struct obd_device),
883                                 0, 0, 0, sizeof(struct obd_device), NULL);
884         if (!obd_device_cachep)
885                 GOTO(out, rc = -ENOMEM);
886
887         RETURN(0);
888 out:
889         obd_cleanup_caches();
890         RETURN(rc);
891 }
892
893 static const char export_handle_owner[] = "export";
894
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
897 {
898         struct obd_export *export;
899         ENTRY;
900
901         if (!conn) {
902                 CDEBUG(D_CACHE, "looking for null handle\n");
903                 RETURN(NULL);
904         }
905
906         if (conn->cookie == -1) {  /* this means assign a new connection */
907                 CDEBUG(D_CACHE, "want a new connection\n");
908                 RETURN(NULL);
909         }
910
911         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912         export = class_handle2object(conn->cookie, export_handle_owner);
913         RETURN(export);
914 }
915 EXPORT_SYMBOL(class_conn2export);
916
917 struct obd_device *class_exp2obd(struct obd_export *exp)
918 {
919         if (exp)
920                 return exp->exp_obd;
921         return NULL;
922 }
923 EXPORT_SYMBOL(class_exp2obd);
924
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
926 {
927         struct obd_device *obd = exp->exp_obd;
928         if (obd == NULL)
929                 return NULL;
930         return obd->u.cli.cl_import;
931 }
932 EXPORT_SYMBOL(class_exp2cliimp);
933
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
936 {
937         struct obd_device *obd = exp->exp_obd;
938         ENTRY;
939
940         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941         LASSERT(obd != NULL);
942
943         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944                exp->exp_client_uuid.uuid, obd->obd_name);
945
946         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947         if (exp->exp_connection)
948                 ptlrpc_put_connection_superhack(exp->exp_connection);
949
950         LASSERT(list_empty(&exp->exp_outstanding_replies));
951         LASSERT(list_empty(&exp->exp_uncommitted_replies));
952         LASSERT(list_empty(&exp->exp_req_replay_queue));
953         LASSERT(list_empty(&exp->exp_hp_rpcs));
954         obd_destroy_export(exp);
955         /* self export doesn't hold a reference to an obd, although it
956          * exists until freeing of the obd */
957         if (exp != obd->obd_self_export)
958                 class_decref(obd, "export", exp);
959
960         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
961         kfree_rcu(exp, exp_handle.h_rcu);
962         EXIT;
963 }
964
965 struct obd_export *class_export_get(struct obd_export *exp)
966 {
967         refcount_inc(&exp->exp_handle.h_ref);
968         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
969                refcount_read(&exp->exp_handle.h_ref));
970         return exp;
971 }
972 EXPORT_SYMBOL(class_export_get);
973
974 void class_export_put(struct obd_export *exp)
975 {
976         LASSERT(exp != NULL);
977         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
979         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
980                refcount_read(&exp->exp_handle.h_ref) - 1);
981
982         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
983                 struct obd_device *obd = exp->exp_obd;
984
985                 CDEBUG(D_IOCTL, "final put %p/%s\n",
986                        exp, exp->exp_client_uuid.uuid);
987
988                 /* release nid stat refererence */
989                 lprocfs_exp_cleanup(exp);
990
991                 if (exp == obd->obd_self_export) {
992                         /* self export should be destroyed without
993                          * zombie thread as it doesn't hold a
994                          * reference to obd and doesn't hold any
995                          * resources */
996                         class_export_destroy(exp);
997                         /* self export is destroyed, no class
998                          * references exist and it is safe to free
999                          * obd */
1000                         class_free_dev(obd);
1001                 } else {
1002                         LASSERT(!list_empty(&exp->exp_obd_chain));
1003                         obd_zombie_export_add(exp);
1004                 }
1005
1006         }
1007 }
1008 EXPORT_SYMBOL(class_export_put);
1009
1010 static void obd_zombie_exp_cull(struct work_struct *ws)
1011 {
1012         struct obd_export *export;
1013
1014         export = container_of(ws, struct obd_export, exp_zombie_work);
1015         class_export_destroy(export);
1016 }
1017
1018 /* Creates a new export, adds it to the hash table, and returns a
1019  * pointer to it. The refcount is 2: one for the hash reference, and
1020  * one for the pointer returned by this function. */
1021 struct obd_export *__class_new_export(struct obd_device *obd,
1022                                       struct obd_uuid *cluuid, bool is_self)
1023 {
1024         struct obd_export *export;
1025         int rc = 0;
1026         ENTRY;
1027
1028         OBD_ALLOC_PTR(export);
1029         if (!export)
1030                 return ERR_PTR(-ENOMEM);
1031
1032         export->exp_conn_cnt = 0;
1033         export->exp_lock_hash = NULL;
1034         export->exp_flock_hash = NULL;
1035         /* 2 = class_handle_hash + last */
1036         refcount_set(&export->exp_handle.h_ref, 2);
1037         atomic_set(&export->exp_rpc_count, 0);
1038         atomic_set(&export->exp_cb_count, 0);
1039         atomic_set(&export->exp_locks_count, 0);
1040 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1041         INIT_LIST_HEAD(&export->exp_locks_list);
1042         spin_lock_init(&export->exp_locks_list_guard);
1043 #endif
1044         atomic_set(&export->exp_replay_count, 0);
1045         export->exp_obd = obd;
1046         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1047         spin_lock_init(&export->exp_uncommitted_replies_lock);
1048         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1049         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1050         INIT_HLIST_NODE(&export->exp_handle.h_link);
1051         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1052         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1053         class_handle_hash(&export->exp_handle, export_handle_owner);
1054         export->exp_last_request_time = ktime_get_real_seconds();
1055         spin_lock_init(&export->exp_lock);
1056         spin_lock_init(&export->exp_rpc_lock);
1057         INIT_HLIST_NODE(&export->exp_nid_hash);
1058         INIT_HLIST_NODE(&export->exp_gen_hash);
1059         spin_lock_init(&export->exp_bl_list_lock);
1060         INIT_LIST_HEAD(&export->exp_bl_list);
1061         INIT_LIST_HEAD(&export->exp_stale_list);
1062         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1063
1064         export->exp_sp_peer = LUSTRE_SP_ANY;
1065         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066         export->exp_client_uuid = *cluuid;
1067         obd_init_export(export);
1068
1069         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1070
1071         spin_lock(&obd->obd_dev_lock);
1072         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073                 /* shouldn't happen, but might race */
1074                 if (obd->obd_stopping)
1075                         GOTO(exit_unlock, rc = -ENODEV);
1076
1077                 rc = obd_uuid_add(obd, export);
1078                 if (rc != 0) {
1079                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080                                       obd->obd_name, cluuid->uuid, rc);
1081                         GOTO(exit_unlock, rc = -EALREADY);
1082                 }
1083         }
1084
1085         if (!is_self) {
1086                 class_incref(obd, "export", export);
1087                 list_add_tail(&export->exp_obd_chain_timed,
1088                               &obd->obd_exports_timed);
1089                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090                 obd->obd_num_exports++;
1091         } else {
1092                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093                 INIT_LIST_HEAD(&export->exp_obd_chain);
1094         }
1095         spin_unlock(&obd->obd_dev_lock);
1096         RETURN(export);
1097
1098 exit_unlock:
1099         spin_unlock(&obd->obd_dev_lock);
1100         class_handle_unhash(&export->exp_handle);
1101         obd_destroy_export(export);
1102         OBD_FREE_PTR(export);
1103         return ERR_PTR(rc);
1104 }
1105
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107                                     struct obd_uuid *uuid)
1108 {
1109         return __class_new_export(obd, uuid, false);
1110 }
1111 EXPORT_SYMBOL(class_new_export);
1112
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114                                          struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, true);
1117 }
1118
1119 void class_unlink_export(struct obd_export *exp)
1120 {
1121         class_handle_unhash(&exp->exp_handle);
1122
1123         if (exp->exp_obd->obd_self_export == exp) {
1124                 class_export_put(exp);
1125                 return;
1126         }
1127
1128         spin_lock(&exp->exp_obd->obd_dev_lock);
1129         /* delete an uuid-export hashitem from hashtables */
1130         if (exp != exp->exp_obd->obd_self_export)
1131                 obd_uuid_del(exp->exp_obd, exp);
1132
1133 #ifdef HAVE_SERVER_SUPPORT
1134         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135                 struct tg_export_data   *ted = &exp->exp_target_data;
1136                 struct cfs_hash         *hash;
1137
1138                 /* Because obd_gen_hash will not be released until
1139                  * class_cleanup(), so hash should never be NULL here */
1140                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141                 LASSERT(hash != NULL);
1142                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143                              &exp->exp_gen_hash);
1144                 cfs_hash_putref(hash);
1145         }
1146 #endif /* HAVE_SERVER_SUPPORT */
1147
1148         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149         list_del_init(&exp->exp_obd_chain_timed);
1150         exp->exp_obd->obd_num_exports--;
1151         spin_unlock(&exp->exp_obd->obd_dev_lock);
1152         atomic_inc(&obd_stale_export_num);
1153
1154         /* A reference is kept by obd_stale_exports list */
1155         obd_stale_export_put(exp);
1156 }
1157 EXPORT_SYMBOL(class_unlink_export);
1158
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1161 {
1162         ENTRY;
1163
1164         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1165                 imp->imp_obd->obd_name);
1166
1167         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1168
1169         ptlrpc_put_connection_superhack(imp->imp_connection);
1170
1171         while (!list_empty(&imp->imp_conn_list)) {
1172                 struct obd_import_conn *imp_conn;
1173
1174                 imp_conn = list_first_entry(&imp->imp_conn_list,
1175                                             struct obd_import_conn, oic_item);
1176                 list_del_init(&imp_conn->oic_item);
1177                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1178                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1179         }
1180
1181         LASSERT(imp->imp_sec == NULL);
1182         class_decref(imp->imp_obd, "import", imp);
1183         OBD_FREE_PTR(imp);
1184         EXIT;
1185 }
1186
1187 struct obd_import *class_import_get(struct obd_import *import)
1188 {
1189         refcount_inc(&import->imp_refcount);
1190         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1191                refcount_read(&import->imp_refcount),
1192                import->imp_obd->obd_name);
1193         return import;
1194 }
1195 EXPORT_SYMBOL(class_import_get);
1196
1197 void class_import_put(struct obd_import *imp)
1198 {
1199         ENTRY;
1200
1201         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1202
1203         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1204                refcount_read(&imp->imp_refcount) - 1,
1205                imp->imp_obd->obd_name);
1206
1207         if (refcount_dec_and_test(&imp->imp_refcount)) {
1208                 CDEBUG(D_INFO, "final put import %p\n", imp);
1209                 obd_zombie_import_add(imp);
1210         }
1211
1212         EXIT;
1213 }
1214 EXPORT_SYMBOL(class_import_put);
1215
1216 static void init_imp_at(struct imp_at *at) {
1217         int i;
1218         at_init(&at->iat_net_latency, 0, 0);
1219         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1220                 /* max service estimates are tracked on the server side, so
1221                    don't use the AT history here, just use the last reported
1222                    val. (But keep hist for proc histogram, worst_ever) */
1223                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1224                         AT_FLG_NOHIST);
1225         }
1226 }
1227
1228 static void obd_zombie_imp_cull(struct work_struct *ws)
1229 {
1230         struct obd_import *import;
1231
1232         import = container_of(ws, struct obd_import, imp_zombie_work);
1233         obd_zombie_import_free(import);
1234 }
1235
1236 struct obd_import *class_new_import(struct obd_device *obd)
1237 {
1238         struct obd_import *imp;
1239         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1240
1241         OBD_ALLOC(imp, sizeof(*imp));
1242         if (imp == NULL)
1243                 return NULL;
1244
1245         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1246         INIT_LIST_HEAD(&imp->imp_replay_list);
1247         INIT_LIST_HEAD(&imp->imp_sending_list);
1248         INIT_LIST_HEAD(&imp->imp_delayed_list);
1249         INIT_LIST_HEAD(&imp->imp_committed_list);
1250         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1251         imp->imp_known_replied_xid = 0;
1252         imp->imp_replay_cursor = &imp->imp_committed_list;
1253         spin_lock_init(&imp->imp_lock);
1254         imp->imp_last_success_conn = 0;
1255         imp->imp_state = LUSTRE_IMP_NEW;
1256         imp->imp_obd = class_incref(obd, "import", imp);
1257         rwlock_init(&imp->imp_sec_lock);
1258         init_waitqueue_head(&imp->imp_recovery_waitq);
1259         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1260
1261         if (curr_pid_ns && curr_pid_ns->child_reaper)
1262                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1263         else
1264                 imp->imp_sec_refpid = 1;
1265
1266         refcount_set(&imp->imp_refcount, 2);
1267         atomic_set(&imp->imp_unregistering, 0);
1268         atomic_set(&imp->imp_inflight, 0);
1269         atomic_set(&imp->imp_replay_inflight, 0);
1270         atomic_set(&imp->imp_inval_count, 0);
1271         INIT_LIST_HEAD(&imp->imp_conn_list);
1272         init_imp_at(&imp->imp_at);
1273
1274         /* the default magic is V2, will be used in connect RPC, and
1275          * then adjusted according to the flags in request/reply. */
1276         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1277
1278         return imp;
1279 }
1280 EXPORT_SYMBOL(class_new_import);
1281
1282 void class_destroy_import(struct obd_import *import)
1283 {
1284         LASSERT(import != NULL);
1285         LASSERT(import != LP_POISON);
1286
1287         spin_lock(&import->imp_lock);
1288         import->imp_generation++;
1289         spin_unlock(&import->imp_lock);
1290         class_import_put(import);
1291 }
1292 EXPORT_SYMBOL(class_destroy_import);
1293
1294 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1295
1296 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1297 {
1298         spin_lock(&exp->exp_locks_list_guard);
1299
1300         LASSERT(lock->l_exp_refs_nr >= 0);
1301
1302         if (lock->l_exp_refs_target != NULL &&
1303             lock->l_exp_refs_target != exp) {
1304                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1305                               exp, lock, lock->l_exp_refs_target);
1306         }
1307         if ((lock->l_exp_refs_nr ++) == 0) {
1308                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1309                 lock->l_exp_refs_target = exp;
1310         }
1311         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1312                lock, exp, lock->l_exp_refs_nr);
1313         spin_unlock(&exp->exp_locks_list_guard);
1314 }
1315 EXPORT_SYMBOL(__class_export_add_lock_ref);
1316
1317 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1318 {
1319         spin_lock(&exp->exp_locks_list_guard);
1320         LASSERT(lock->l_exp_refs_nr > 0);
1321         if (lock->l_exp_refs_target != exp) {
1322                 LCONSOLE_WARN("lock %p, "
1323                               "mismatching export pointers: %p, %p\n",
1324                               lock, lock->l_exp_refs_target, exp);
1325         }
1326         if (-- lock->l_exp_refs_nr == 0) {
1327                 list_del_init(&lock->l_exp_refs_link);
1328                 lock->l_exp_refs_target = NULL;
1329         }
1330         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1331                lock, exp, lock->l_exp_refs_nr);
1332         spin_unlock(&exp->exp_locks_list_guard);
1333 }
1334 EXPORT_SYMBOL(__class_export_del_lock_ref);
1335 #endif
1336
1337 /* A connection defines an export context in which preallocation can
1338    be managed. This releases the export pointer reference, and returns
1339    the export handle, so the export refcount is 1 when this function
1340    returns. */
1341 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1342                   struct obd_uuid *cluuid)
1343 {
1344         struct obd_export *export;
1345         LASSERT(conn != NULL);
1346         LASSERT(obd != NULL);
1347         LASSERT(cluuid != NULL);
1348         ENTRY;
1349
1350         export = class_new_export(obd, cluuid);
1351         if (IS_ERR(export))
1352                 RETURN(PTR_ERR(export));
1353
1354         conn->cookie = export->exp_handle.h_cookie;
1355         class_export_put(export);
1356
1357         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1358                cluuid->uuid, conn->cookie);
1359         RETURN(0);
1360 }
1361 EXPORT_SYMBOL(class_connect);
1362
1363 /* if export is involved in recovery then clean up related things */
1364 static void class_export_recovery_cleanup(struct obd_export *exp)
1365 {
1366         struct obd_device *obd = exp->exp_obd;
1367
1368         spin_lock(&obd->obd_recovery_task_lock);
1369         if (obd->obd_recovering) {
1370                 if (exp->exp_in_recovery) {
1371                         spin_lock(&exp->exp_lock);
1372                         exp->exp_in_recovery = 0;
1373                         spin_unlock(&exp->exp_lock);
1374                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1375                         atomic_dec(&obd->obd_connected_clients);
1376                 }
1377
1378                 /* if called during recovery then should update
1379                  * obd_stale_clients counter,
1380                  * lightweight exports are not counted */
1381                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1382                         exp->exp_obd->obd_stale_clients++;
1383         }
1384         spin_unlock(&obd->obd_recovery_task_lock);
1385
1386         spin_lock(&exp->exp_lock);
1387         /** Cleanup req replay fields */
1388         if (exp->exp_req_replay_needed) {
1389                 exp->exp_req_replay_needed = 0;
1390
1391                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1392                 atomic_dec(&obd->obd_req_replay_clients);
1393         }
1394
1395         /** Cleanup lock replay data */
1396         if (exp->exp_lock_replay_needed) {
1397                 exp->exp_lock_replay_needed = 0;
1398
1399                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1400                 atomic_dec(&obd->obd_lock_replay_clients);
1401         }
1402         spin_unlock(&exp->exp_lock);
1403 }
1404
1405 /* This function removes 1-3 references from the export:
1406  * 1 - for export pointer passed
1407  * and if disconnect really need
1408  * 2 - removing from hash
1409  * 3 - in client_unlink_export
1410  * The export pointer passed to this function can destroyed */
1411 int class_disconnect(struct obd_export *export)
1412 {
1413         int already_disconnected;
1414         ENTRY;
1415
1416         if (export == NULL) {
1417                 CWARN("attempting to free NULL export %p\n", export);
1418                 RETURN(-EINVAL);
1419         }
1420
1421         spin_lock(&export->exp_lock);
1422         already_disconnected = export->exp_disconnected;
1423         export->exp_disconnected = 1;
1424         /*  We hold references of export for uuid hash
1425          *  and nid_hash and export link at least. So
1426          *  it is safe to call cfs_hash_del in there.  */
1427         if (!hlist_unhashed(&export->exp_nid_hash))
1428                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1429                              &export->exp_connection->c_peer.nid,
1430                              &export->exp_nid_hash);
1431         spin_unlock(&export->exp_lock);
1432
1433         /* class_cleanup(), abort_recovery(), and class_fail_export()
1434          * all end up in here, and if any of them race we shouldn't
1435          * call extra class_export_puts(). */
1436         if (already_disconnected) {
1437                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1438                 GOTO(no_disconn, already_disconnected);
1439         }
1440
1441         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1442                export->exp_handle.h_cookie);
1443
1444         class_export_recovery_cleanup(export);
1445         class_unlink_export(export);
1446 no_disconn:
1447         class_export_put(export);
1448         RETURN(0);
1449 }
1450 EXPORT_SYMBOL(class_disconnect);
1451
1452 /* Return non-zero for a fully connected export */
1453 int class_connected_export(struct obd_export *exp)
1454 {
1455         int connected = 0;
1456
1457         if (exp) {
1458                 spin_lock(&exp->exp_lock);
1459                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1460                 spin_unlock(&exp->exp_lock);
1461         }
1462         return connected;
1463 }
1464 EXPORT_SYMBOL(class_connected_export);
1465
1466 static void class_disconnect_export_list(struct list_head *list,
1467                                          enum obd_option flags)
1468 {
1469         int rc;
1470         struct obd_export *exp;
1471         ENTRY;
1472
1473         /* It's possible that an export may disconnect itself, but
1474          * nothing else will be added to this list. */
1475         while (!list_empty(list)) {
1476                 exp = list_first_entry(list, struct obd_export,
1477                                        exp_obd_chain);
1478                 /* need for safe call CDEBUG after obd_disconnect */
1479                 class_export_get(exp);
1480
1481                 spin_lock(&exp->exp_lock);
1482                 exp->exp_flags = flags;
1483                 spin_unlock(&exp->exp_lock);
1484
1485                 if (obd_uuid_equals(&exp->exp_client_uuid,
1486                                     &exp->exp_obd->obd_uuid)) {
1487                         CDEBUG(D_HA,
1488                                "exp %p export uuid == obd uuid, don't discon\n",
1489                                exp);
1490                         /* Need to delete this now so we don't end up pointing
1491                          * to work_list later when this export is cleaned up. */
1492                         list_del_init(&exp->exp_obd_chain);
1493                         class_export_put(exp);
1494                         continue;
1495                 }
1496
1497                 class_export_get(exp);
1498                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1499                        "last request at %lld\n",
1500                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1501                        exp, exp->exp_last_request_time);
1502                 /* release one export reference anyway */
1503                 rc = obd_disconnect(exp);
1504
1505                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1506                        obd_export_nid2str(exp), exp, rc);
1507                 class_export_put(exp);
1508         }
1509         EXIT;
1510 }
1511
1512 void class_disconnect_exports(struct obd_device *obd)
1513 {
1514         LIST_HEAD(work_list);
1515         ENTRY;
1516
1517         /* Move all of the exports from obd_exports to a work list, en masse. */
1518         spin_lock(&obd->obd_dev_lock);
1519         list_splice_init(&obd->obd_exports, &work_list);
1520         list_splice_init(&obd->obd_delayed_exports, &work_list);
1521         spin_unlock(&obd->obd_dev_lock);
1522
1523         if (!list_empty(&work_list)) {
1524                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1525                        "disconnecting them\n", obd->obd_minor, obd);
1526                 class_disconnect_export_list(&work_list,
1527                                              exp_flags_from_obd(obd));
1528         } else
1529                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1530                        obd->obd_minor, obd);
1531         EXIT;
1532 }
1533 EXPORT_SYMBOL(class_disconnect_exports);
1534
1535 /* Remove exports that have not completed recovery.
1536  */
1537 void class_disconnect_stale_exports(struct obd_device *obd,
1538                                     int (*test_export)(struct obd_export *))
1539 {
1540         LIST_HEAD(work_list);
1541         struct obd_export *exp, *n;
1542         int evicted = 0;
1543         ENTRY;
1544
1545         spin_lock(&obd->obd_dev_lock);
1546         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1547                                  exp_obd_chain) {
1548                 /* don't count self-export as client */
1549                 if (obd_uuid_equals(&exp->exp_client_uuid,
1550                                     &exp->exp_obd->obd_uuid))
1551                         continue;
1552
1553                 /* don't evict clients which have no slot in last_rcvd
1554                  * (e.g. lightweight connection) */
1555                 if (exp->exp_target_data.ted_lr_idx == -1)
1556                         continue;
1557
1558                 spin_lock(&exp->exp_lock);
1559                 if (exp->exp_failed || test_export(exp)) {
1560                         spin_unlock(&exp->exp_lock);
1561                         continue;
1562                 }
1563                 exp->exp_failed = 1;
1564                 spin_unlock(&exp->exp_lock);
1565
1566                 list_move(&exp->exp_obd_chain, &work_list);
1567                 evicted++;
1568                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1569                        obd->obd_name, exp->exp_client_uuid.uuid,
1570                        obd_export_nid2str(exp));
1571                 print_export_data(exp, "EVICTING", 0, D_HA);
1572         }
1573         spin_unlock(&obd->obd_dev_lock);
1574
1575         if (evicted)
1576                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1577                               obd->obd_name, evicted);
1578
1579         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1580                                                  OBD_OPT_ABORT_RECOV);
1581         EXIT;
1582 }
1583 EXPORT_SYMBOL(class_disconnect_stale_exports);
1584
1585 void class_fail_export(struct obd_export *exp)
1586 {
1587         int rc, already_failed;
1588
1589         spin_lock(&exp->exp_lock);
1590         already_failed = exp->exp_failed;
1591         exp->exp_failed = 1;
1592         spin_unlock(&exp->exp_lock);
1593
1594         if (already_failed) {
1595                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1596                        exp, exp->exp_client_uuid.uuid);
1597                 return;
1598         }
1599
1600         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1601                exp, exp->exp_client_uuid.uuid);
1602
1603         if (obd_dump_on_timeout)
1604                 libcfs_debug_dumplog();
1605
1606         /* need for safe call CDEBUG after obd_disconnect */
1607         class_export_get(exp);
1608
1609         /* Most callers into obd_disconnect are removing their own reference
1610          * (request, for example) in addition to the one from the hash table.
1611          * We don't have such a reference here, so make one. */
1612         class_export_get(exp);
1613         rc = obd_disconnect(exp);
1614         if (rc)
1615                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1616         else
1617                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1618                        exp, exp->exp_client_uuid.uuid);
1619         class_export_put(exp);
1620 }
1621 EXPORT_SYMBOL(class_fail_export);
1622
1623 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1624 {
1625         struct cfs_hash *nid_hash;
1626         struct obd_export *doomed_exp = NULL;
1627         int exports_evicted = 0;
1628
1629         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1630
1631         spin_lock(&obd->obd_dev_lock);
1632         /* umount has run already, so evict thread should leave
1633          * its task to umount thread now */
1634         if (obd->obd_stopping) {
1635                 spin_unlock(&obd->obd_dev_lock);
1636                 return exports_evicted;
1637         }
1638         nid_hash = obd->obd_nid_hash;
1639         cfs_hash_getref(nid_hash);
1640         spin_unlock(&obd->obd_dev_lock);
1641
1642         do {
1643                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1644                 if (doomed_exp == NULL)
1645                         break;
1646
1647                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1648                          "nid %s found, wanted nid %s, requested nid %s\n",
1649                          obd_export_nid2str(doomed_exp),
1650                          libcfs_nid2str(nid_key), nid);
1651                 LASSERTF(doomed_exp != obd->obd_self_export,
1652                          "self-export is hashed by NID?\n");
1653                 exports_evicted++;
1654                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1655                               "request\n", obd->obd_name,
1656                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1657                               obd_export_nid2str(doomed_exp));
1658                 class_fail_export(doomed_exp);
1659                 class_export_put(doomed_exp);
1660         } while (1);
1661
1662         cfs_hash_putref(nid_hash);
1663
1664         if (!exports_evicted)
1665                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1666                        obd->obd_name, nid);
1667         return exports_evicted;
1668 }
1669 EXPORT_SYMBOL(obd_export_evict_by_nid);
1670
1671 #ifdef HAVE_SERVER_SUPPORT
1672 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1673 {
1674         struct obd_export *doomed_exp = NULL;
1675         struct obd_uuid doomed_uuid;
1676         int exports_evicted = 0;
1677
1678         spin_lock(&obd->obd_dev_lock);
1679         if (obd->obd_stopping) {
1680                 spin_unlock(&obd->obd_dev_lock);
1681                 return exports_evicted;
1682         }
1683         spin_unlock(&obd->obd_dev_lock);
1684
1685         obd_str2uuid(&doomed_uuid, uuid);
1686         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1687                 CERROR("%s: can't evict myself\n", obd->obd_name);
1688                 return exports_evicted;
1689         }
1690
1691         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1692         if (doomed_exp == NULL) {
1693                 CERROR("%s: can't disconnect %s: no exports found\n",
1694                        obd->obd_name, uuid);
1695         } else {
1696                 CWARN("%s: evicting %s at adminstrative request\n",
1697                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1698                 class_fail_export(doomed_exp);
1699                 class_export_put(doomed_exp);
1700                 obd_uuid_del(obd, doomed_exp);
1701                 exports_evicted++;
1702         }
1703
1704         return exports_evicted;
1705 }
1706 #endif /* HAVE_SERVER_SUPPORT */
1707
1708 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1709 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1710 EXPORT_SYMBOL(class_export_dump_hook);
1711 #endif
1712
1713 static void print_export_data(struct obd_export *exp, const char *status,
1714                               int locks, int debug_level)
1715 {
1716         struct ptlrpc_reply_state *rs;
1717         struct ptlrpc_reply_state *first_reply = NULL;
1718         int nreplies = 0;
1719
1720         spin_lock(&exp->exp_lock);
1721         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1722                             rs_exp_list) {
1723                 if (nreplies == 0)
1724                         first_reply = rs;
1725                 nreplies++;
1726         }
1727         spin_unlock(&exp->exp_lock);
1728
1729         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1730                "%p %s %llu stale:%d\n",
1731                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1732                obd_export_nid2str(exp),
1733                refcount_read(&exp->exp_handle.h_ref),
1734                atomic_read(&exp->exp_rpc_count),
1735                atomic_read(&exp->exp_cb_count),
1736                atomic_read(&exp->exp_locks_count),
1737                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1738                nreplies, first_reply, nreplies > 3 ? "..." : "",
1739                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1740 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1741         if (locks && class_export_dump_hook != NULL)
1742                 class_export_dump_hook(exp);
1743 #endif
1744 }
1745
1746 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1747 {
1748         struct obd_export *exp;
1749
1750         spin_lock(&obd->obd_dev_lock);
1751         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1752                 print_export_data(exp, "ACTIVE", locks, debug_level);
1753         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1754                 print_export_data(exp, "UNLINKED", locks, debug_level);
1755         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1756                 print_export_data(exp, "DELAYED", locks, debug_level);
1757         spin_unlock(&obd->obd_dev_lock);
1758 }
1759
1760 void obd_exports_barrier(struct obd_device *obd)
1761 {
1762         int waited = 2;
1763         LASSERT(list_empty(&obd->obd_exports));
1764         spin_lock(&obd->obd_dev_lock);
1765         while (!list_empty(&obd->obd_unlinked_exports)) {
1766                 spin_unlock(&obd->obd_dev_lock);
1767                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1768                 if (waited > 5 && is_power_of_2(waited)) {
1769                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1770                                       "more than %d seconds. "
1771                                       "The obd refcount = %d. Is it stuck?\n",
1772                                       obd->obd_name, waited,
1773                                       atomic_read(&obd->obd_refcount));
1774                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1775                 }
1776                 waited *= 2;
1777                 spin_lock(&obd->obd_dev_lock);
1778         }
1779         spin_unlock(&obd->obd_dev_lock);
1780 }
1781 EXPORT_SYMBOL(obd_exports_barrier);
1782
1783 /**
1784  * Add export to the obd_zombe thread and notify it.
1785  */
1786 static void obd_zombie_export_add(struct obd_export *exp) {
1787         atomic_dec(&obd_stale_export_num);
1788         spin_lock(&exp->exp_obd->obd_dev_lock);
1789         LASSERT(!list_empty(&exp->exp_obd_chain));
1790         list_del_init(&exp->exp_obd_chain);
1791         spin_unlock(&exp->exp_obd->obd_dev_lock);
1792
1793         queue_work(zombie_wq, &exp->exp_zombie_work);
1794 }
1795
1796 /**
1797  * Add import to the obd_zombe thread and notify it.
1798  */
1799 static void obd_zombie_import_add(struct obd_import *imp) {
1800         LASSERT(imp->imp_sec == NULL);
1801
1802         queue_work(zombie_wq, &imp->imp_zombie_work);
1803 }
1804
1805 /**
1806  * wait when obd_zombie import/export queues become empty
1807  */
1808 void obd_zombie_barrier(void)
1809 {
1810         flush_workqueue(zombie_wq);
1811 }
1812 EXPORT_SYMBOL(obd_zombie_barrier);
1813
1814
1815 struct obd_export *obd_stale_export_get(void)
1816 {
1817         struct obd_export *exp = NULL;
1818         ENTRY;
1819
1820         spin_lock(&obd_stale_export_lock);
1821         if (!list_empty(&obd_stale_exports)) {
1822                 exp = list_first_entry(&obd_stale_exports,
1823                                        struct obd_export, exp_stale_list);
1824                 list_del_init(&exp->exp_stale_list);
1825         }
1826         spin_unlock(&obd_stale_export_lock);
1827
1828         if (exp) {
1829                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1830                        atomic_read(&obd_stale_export_num));
1831         }
1832         RETURN(exp);
1833 }
1834 EXPORT_SYMBOL(obd_stale_export_get);
1835
1836 void obd_stale_export_put(struct obd_export *exp)
1837 {
1838         ENTRY;
1839
1840         LASSERT(list_empty(&exp->exp_stale_list));
1841         if (exp->exp_lock_hash &&
1842             atomic_read(&exp->exp_lock_hash->hs_count)) {
1843                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1844                        atomic_read(&obd_stale_export_num));
1845
1846                 spin_lock_bh(&exp->exp_bl_list_lock);
1847                 spin_lock(&obd_stale_export_lock);
1848                 /* Add to the tail if there is no blocked locks,
1849                  * to the head otherwise. */
1850                 if (list_empty(&exp->exp_bl_list))
1851                         list_add_tail(&exp->exp_stale_list,
1852                                       &obd_stale_exports);
1853                 else
1854                         list_add(&exp->exp_stale_list,
1855                                  &obd_stale_exports);
1856
1857                 spin_unlock(&obd_stale_export_lock);
1858                 spin_unlock_bh(&exp->exp_bl_list_lock);
1859         } else {
1860                 class_export_put(exp);
1861         }
1862         EXIT;
1863 }
1864 EXPORT_SYMBOL(obd_stale_export_put);
1865
1866 /**
1867  * Adjust the position of the export in the stale list,
1868  * i.e. move to the head of the list if is needed.
1869  **/
1870 void obd_stale_export_adjust(struct obd_export *exp)
1871 {
1872         LASSERT(exp != NULL);
1873         spin_lock_bh(&exp->exp_bl_list_lock);
1874         spin_lock(&obd_stale_export_lock);
1875
1876         if (!list_empty(&exp->exp_stale_list) &&
1877             !list_empty(&exp->exp_bl_list))
1878                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1879
1880         spin_unlock(&obd_stale_export_lock);
1881         spin_unlock_bh(&exp->exp_bl_list_lock);
1882 }
1883 EXPORT_SYMBOL(obd_stale_export_adjust);
1884
1885 /**
1886  * start destroy zombie import/export thread
1887  */
1888 int obd_zombie_impexp_init(void)
1889 {
1890         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1891         if (!zombie_wq)
1892                 return -ENOMEM;
1893
1894         return 0;
1895 }
1896
1897 /**
1898  * stop destroy zombie import/export thread
1899  */
1900 void obd_zombie_impexp_stop(void)
1901 {
1902         destroy_workqueue(zombie_wq);
1903         LASSERT(list_empty(&obd_stale_exports));
1904 }
1905
1906 /***** Kernel-userspace comm helpers *******/
1907
1908 /* Get length of entire message, including header */
1909 int kuc_len(int payload_len)
1910 {
1911         return sizeof(struct kuc_hdr) + payload_len;
1912 }
1913 EXPORT_SYMBOL(kuc_len);
1914
1915 /* Get a pointer to kuc header, given a ptr to the payload
1916  * @param p Pointer to payload area
1917  * @returns Pointer to kuc header
1918  */
1919 struct kuc_hdr * kuc_ptr(void *p)
1920 {
1921         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1922         LASSERT(lh->kuc_magic == KUC_MAGIC);
1923         return lh;
1924 }
1925 EXPORT_SYMBOL(kuc_ptr);
1926
1927 /* Alloc space for a message, and fill in header
1928  * @return Pointer to payload area
1929  */
1930 void *kuc_alloc(int payload_len, int transport, int type)
1931 {
1932         struct kuc_hdr *lh;
1933         int len = kuc_len(payload_len);
1934
1935         OBD_ALLOC(lh, len);
1936         if (lh == NULL)
1937                 return ERR_PTR(-ENOMEM);
1938
1939         lh->kuc_magic = KUC_MAGIC;
1940         lh->kuc_transport = transport;
1941         lh->kuc_msgtype = type;
1942         lh->kuc_msglen = len;
1943
1944         return (void *)(lh + 1);
1945 }
1946 EXPORT_SYMBOL(kuc_alloc);
1947
1948 /* Takes pointer to payload area */
1949 void kuc_free(void *p, int payload_len)
1950 {
1951         struct kuc_hdr *lh = kuc_ptr(p);
1952         OBD_FREE(lh, kuc_len(payload_len));
1953 }
1954 EXPORT_SYMBOL(kuc_free);
1955
1956 struct obd_request_slot_waiter {
1957         struct list_head        orsw_entry;
1958         wait_queue_head_t       orsw_waitq;
1959         bool                    orsw_signaled;
1960 };
1961
1962 static bool obd_request_slot_avail(struct client_obd *cli,
1963                                    struct obd_request_slot_waiter *orsw)
1964 {
1965         bool avail;
1966
1967         spin_lock(&cli->cl_loi_list_lock);
1968         avail = !!list_empty(&orsw->orsw_entry);
1969         spin_unlock(&cli->cl_loi_list_lock);
1970
1971         return avail;
1972 };
1973
1974 /*
1975  * For network flow control, the RPC sponsor needs to acquire a credit
1976  * before sending the RPC. The credits count for a connection is defined
1977  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1978  * the subsequent RPC sponsors need to wait until others released their
1979  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1980  */
1981 int obd_get_request_slot(struct client_obd *cli)
1982 {
1983         struct obd_request_slot_waiter   orsw;
1984         int                              rc;
1985
1986         spin_lock(&cli->cl_loi_list_lock);
1987         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1988                 cli->cl_rpcs_in_flight++;
1989                 spin_unlock(&cli->cl_loi_list_lock);
1990                 return 0;
1991         }
1992
1993         init_waitqueue_head(&orsw.orsw_waitq);
1994         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1995         orsw.orsw_signaled = false;
1996         spin_unlock(&cli->cl_loi_list_lock);
1997
1998         rc = l_wait_event_abortable(orsw.orsw_waitq,
1999                                     obd_request_slot_avail(cli, &orsw) ||
2000                                     orsw.orsw_signaled);
2001
2002         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2003          * freed but other (such as obd_put_request_slot) is using it. */
2004         spin_lock(&cli->cl_loi_list_lock);
2005         if (rc != 0) {
2006                 if (!orsw.orsw_signaled) {
2007                         if (list_empty(&orsw.orsw_entry))
2008                                 cli->cl_rpcs_in_flight--;
2009                         else
2010                                 list_del(&orsw.orsw_entry);
2011                 }
2012                 rc = -EINTR;
2013         }
2014
2015         if (orsw.orsw_signaled) {
2016                 LASSERT(list_empty(&orsw.orsw_entry));
2017
2018                 rc = -EINTR;
2019         }
2020         spin_unlock(&cli->cl_loi_list_lock);
2021
2022         return rc;
2023 }
2024 EXPORT_SYMBOL(obd_get_request_slot);
2025
2026 void obd_put_request_slot(struct client_obd *cli)
2027 {
2028         struct obd_request_slot_waiter *orsw;
2029
2030         spin_lock(&cli->cl_loi_list_lock);
2031         cli->cl_rpcs_in_flight--;
2032
2033         /* If there is free slot, wakeup the first waiter. */
2034         if (!list_empty(&cli->cl_flight_waiters) &&
2035             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2036                 orsw = list_first_entry(&cli->cl_flight_waiters,
2037                                         struct obd_request_slot_waiter,
2038                                         orsw_entry);
2039                 list_del_init(&orsw->orsw_entry);
2040                 cli->cl_rpcs_in_flight++;
2041                 wake_up(&orsw->orsw_waitq);
2042         }
2043         spin_unlock(&cli->cl_loi_list_lock);
2044 }
2045 EXPORT_SYMBOL(obd_put_request_slot);
2046
2047 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2048 {
2049         return cli->cl_max_rpcs_in_flight;
2050 }
2051 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2052
2053 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2054 {
2055         struct obd_request_slot_waiter *orsw;
2056         __u32                           old;
2057         int                             diff;
2058         int                             i;
2059         const char *type_name;
2060         int                             rc;
2061
2062         if (max > OBD_MAX_RIF_MAX || max < 1)
2063                 return -ERANGE;
2064
2065         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2066         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2067                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2068                  * strictly lower that max_rpcs_in_flight */
2069                 if (max < 2) {
2070                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2071                                "because it must be higher than "
2072                                "max_mod_rpcs_in_flight value",
2073                                cli->cl_import->imp_obd->obd_name);
2074                         return -ERANGE;
2075                 }
2076                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2077                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2078                         if (rc != 0)
2079                                 return rc;
2080                 }
2081         }
2082
2083         spin_lock(&cli->cl_loi_list_lock);
2084         old = cli->cl_max_rpcs_in_flight;
2085         cli->cl_max_rpcs_in_flight = max;
2086         client_adjust_max_dirty(cli);
2087
2088         diff = max - old;
2089
2090         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2091         for (i = 0; i < diff; i++) {
2092                 if (list_empty(&cli->cl_flight_waiters))
2093                         break;
2094
2095                 orsw = list_first_entry(&cli->cl_flight_waiters,
2096                                         struct obd_request_slot_waiter,
2097                                         orsw_entry);
2098                 list_del_init(&orsw->orsw_entry);
2099                 cli->cl_rpcs_in_flight++;
2100                 wake_up(&orsw->orsw_waitq);
2101         }
2102         spin_unlock(&cli->cl_loi_list_lock);
2103
2104         return 0;
2105 }
2106 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2107
2108 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2109 {
2110         return cli->cl_max_mod_rpcs_in_flight;
2111 }
2112 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2113
2114 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2115 {
2116         struct obd_connect_data *ocd;
2117         __u16 maxmodrpcs;
2118         __u16 prev;
2119
2120         if (max > OBD_MAX_RIF_MAX || max < 1)
2121                 return -ERANGE;
2122
2123         /* cannot exceed or equal max_rpcs_in_flight */
2124         if (max >= cli->cl_max_rpcs_in_flight) {
2125                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2126                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2127                        cli->cl_import->imp_obd->obd_name,
2128                        max, cli->cl_max_rpcs_in_flight);
2129                 return -ERANGE;
2130         }
2131
2132         /* cannot exceed max modify RPCs in flight supported by the server */
2133         ocd = &cli->cl_import->imp_connect_data;
2134         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2135                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2136         else
2137                 maxmodrpcs = 1;
2138         if (max > maxmodrpcs) {
2139                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2140                        "higher than max_mod_rpcs_per_client value (%hu) "
2141                        "returned by the server at connection\n",
2142                        cli->cl_import->imp_obd->obd_name,
2143                        max, maxmodrpcs);
2144                 return -ERANGE;
2145         }
2146
2147         spin_lock(&cli->cl_mod_rpcs_lock);
2148
2149         prev = cli->cl_max_mod_rpcs_in_flight;
2150         cli->cl_max_mod_rpcs_in_flight = max;
2151
2152         /* wakeup waiters if limit has been increased */
2153         if (cli->cl_max_mod_rpcs_in_flight > prev)
2154                 wake_up(&cli->cl_mod_rpcs_waitq);
2155
2156         spin_unlock(&cli->cl_mod_rpcs_lock);
2157
2158         return 0;
2159 }
2160 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2161
2162 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2163                                struct seq_file *seq)
2164 {
2165         unsigned long mod_tot = 0, mod_cum;
2166         struct timespec64 now;
2167         int i;
2168
2169         ktime_get_real_ts64(&now);
2170
2171         spin_lock(&cli->cl_mod_rpcs_lock);
2172
2173         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2174                    (s64)now.tv_sec, now.tv_nsec);
2175         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2176                    cli->cl_mod_rpcs_in_flight);
2177
2178         seq_printf(seq, "\n\t\t\tmodify\n");
2179         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2180
2181         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2182
2183         mod_cum = 0;
2184         for (i = 0; i < OBD_HIST_MAX; i++) {
2185                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2186                 mod_cum += mod;
2187                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2188                            i, mod, pct(mod, mod_tot),
2189                            pct(mod_cum, mod_tot));
2190                 if (mod_cum == mod_tot)
2191                         break;
2192         }
2193
2194         spin_unlock(&cli->cl_mod_rpcs_lock);
2195
2196         return 0;
2197 }
2198 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2199
2200 /* The number of modify RPCs sent in parallel is limited
2201  * because the server has a finite number of slots per client to
2202  * store request result and ensure reply reconstruction when needed.
2203  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2204  * that takes into account server limit and cl_max_rpcs_in_flight
2205  * value.
2206  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2207  * one close request is allowed above the maximum.
2208  */
2209 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2210                                                  bool close_req)
2211 {
2212         bool avail;
2213
2214         /* A slot is available if
2215          * - number of modify RPCs in flight is less than the max
2216          * - it's a close RPC and no other close request is in flight
2217          */
2218         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2219                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2220
2221         return avail;
2222 }
2223
2224 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2225                                          bool close_req)
2226 {
2227         bool avail;
2228
2229         spin_lock(&cli->cl_mod_rpcs_lock);
2230         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2231         spin_unlock(&cli->cl_mod_rpcs_lock);
2232         return avail;
2233 }
2234
2235
2236 /* Get a modify RPC slot from the obd client @cli according
2237  * to the kind of operation @opc that is going to be sent
2238  * and the intent @it of the operation if it applies.
2239  * If the maximum number of modify RPCs in flight is reached
2240  * the thread is put to sleep.
2241  * Returns the tag to be set in the request message. Tag 0
2242  * is reserved for non-modifying requests.
2243  */
2244 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2245 {
2246         bool                    close_req = false;
2247         __u16                   i, max;
2248
2249         if (opc == MDS_CLOSE)
2250                 close_req = true;
2251
2252         do {
2253                 spin_lock(&cli->cl_mod_rpcs_lock);
2254                 max = cli->cl_max_mod_rpcs_in_flight;
2255                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2256                         /* there is a slot available */
2257                         cli->cl_mod_rpcs_in_flight++;
2258                         if (close_req)
2259                                 cli->cl_close_rpcs_in_flight++;
2260                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2261                                          cli->cl_mod_rpcs_in_flight);
2262                         /* find a free tag */
2263                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2264                                                 max + 1);
2265                         LASSERT(i < OBD_MAX_RIF_MAX);
2266                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2267                         spin_unlock(&cli->cl_mod_rpcs_lock);
2268                         /* tag 0 is reserved for non-modify RPCs */
2269
2270                         CDEBUG(D_RPCTRACE,
2271                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2272                                cli->cl_import->imp_obd->obd_name,
2273                                i + 1, opc, max);
2274
2275                         return i + 1;
2276                 }
2277                 spin_unlock(&cli->cl_mod_rpcs_lock);
2278
2279                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2280                        "opc %u, max %hu\n",
2281                        cli->cl_import->imp_obd->obd_name, opc, max);
2282
2283                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2284                                           obd_mod_rpc_slot_avail(cli,
2285                                                                  close_req));
2286         } while (true);
2287 }
2288 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2289
2290 /* Put a modify RPC slot from the obd client @cli according
2291  * to the kind of operation @opc that has been sent.
2292  */
2293 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2294 {
2295         bool                    close_req = false;
2296
2297         if (tag == 0)
2298                 return;
2299
2300         if (opc == MDS_CLOSE)
2301                 close_req = true;
2302
2303         spin_lock(&cli->cl_mod_rpcs_lock);
2304         cli->cl_mod_rpcs_in_flight--;
2305         if (close_req)
2306                 cli->cl_close_rpcs_in_flight--;
2307         /* release the tag in the bitmap */
2308         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2309         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2310         spin_unlock(&cli->cl_mod_rpcs_lock);
2311         wake_up(&cli->cl_mod_rpcs_waitq);
2312 }
2313 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2314