Whamcloud - gitweb
d4feb7cea220ea57e8dcda783a4857e55a7dd0a5
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         type->typ_debugfs_entry = symlink;
208         type->typ_sym_filter = true;
209
210         if (enable_proc) {
211                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
212                                                       NULL, NULL);
213                 if (IS_ERR(type->typ_procroot)) {
214                         CERROR("%s: can't create compat proc entry: %d\n",
215                                name, (int)PTR_ERR(type->typ_procroot));
216                         type->typ_procroot = NULL;
217                 }
218         }
219
220         return type;
221 }
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
224
225 #define CLASS_MAX_NAME 1024
226
227 int class_register_type(const struct obd_ops *dt_ops,
228                         const struct md_ops *md_ops,
229                         bool enable_proc, struct lprocfs_vars *vars,
230                         const char *name, struct lu_device_type *ldt)
231 {
232         struct obd_type *type;
233         int rc;
234
235         ENTRY;
236         /* sanity check */
237         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
238
239         type = class_search_type(name);
240         if (type) {
241 #ifdef HAVE_SERVER_SUPPORT
242                 if (type->typ_sym_filter)
243                         goto dir_exist;
244 #endif /* HAVE_SERVER_SUPPORT */
245                 kobject_put(&type->typ_kobj);
246                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247                 RETURN(-EEXIST);
248         }
249
250         OBD_ALLOC(type, sizeof(*type));
251         if (type == NULL)
252                 RETURN(-ENOMEM);
253
254         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
255         type->typ_kobj.kset = lustre_kset;
256         kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
258 dir_exist:
259 #endif /* HAVE_SERVER_SUPPORT */
260
261         type->typ_dt_ops = dt_ops;
262         type->typ_md_ops = md_ops;
263
264 #ifdef HAVE_SERVER_SUPPORT
265         if (type->typ_sym_filter) {
266                 type->typ_sym_filter = false;
267                 kobject_put(&type->typ_kobj);
268                 goto setup_ldt;
269         }
270 #endif
271 #ifdef CONFIG_PROC_FS
272         if (enable_proc && !type->typ_procroot) {
273                 type->typ_procroot = lprocfs_register(name,
274                                                       proc_lustre_root,
275                                                       NULL, type);
276                 if (IS_ERR(type->typ_procroot)) {
277                         rc = PTR_ERR(type->typ_procroot);
278                         type->typ_procroot = NULL;
279                         GOTO(failed, rc);
280                 }
281         }
282 #endif
283         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
284         ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
285
286         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
287         if (rc)
288                 GOTO(failed, rc);
289 #ifdef HAVE_SERVER_SUPPORT
290 setup_ldt:
291 #endif
292         if (ldt) {
293                 rc = lu_device_type_init(ldt);
294                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
295                 wake_up_var(&type->typ_lu);
296                 if (rc)
297                         GOTO(failed, rc);
298         }
299
300         RETURN(0);
301
302 failed:
303         kobject_put(&type->typ_kobj);
304
305         RETURN(rc);
306 }
307 EXPORT_SYMBOL(class_register_type);
308
309 int class_unregister_type(const char *name)
310 {
311         struct obd_type *type = class_search_type(name);
312         int rc = 0;
313         ENTRY;
314
315         if (!type) {
316                 CERROR("unknown obd type\n");
317                 RETURN(-EINVAL);
318         }
319
320         if (atomic_read(&type->typ_refcnt)) {
321                 CERROR("type %s has refcount (%d)\n", name,
322                        atomic_read(&type->typ_refcnt));
323                 /* This is a bad situation, let's make the best of it */
324                 /* Remove ops, but leave the name for debugging */
325                 type->typ_dt_ops = NULL;
326                 type->typ_md_ops = NULL;
327                 GOTO(out_put, rc = -EBUSY);
328         }
329
330         /* Put the final ref */
331         kobject_put(&type->typ_kobj);
332 out_put:
333         /* Put the ref returned by class_search_type() */
334         kobject_put(&type->typ_kobj);
335
336         RETURN(rc);
337 } /* class_unregister_type */
338 EXPORT_SYMBOL(class_unregister_type);
339
340 /**
341  * Create a new obd device.
342  *
343  * Allocate the new obd_device and initialize it.
344  *
345  * \param[in] type_name obd device type string.
346  * \param[in] name      obd device name.
347  * \param[in] uuid      obd device UUID
348  *
349  * \retval newdev         pointer to created obd_device
350  * \retval ERR_PTR(errno) on error
351  */
352 struct obd_device *class_newdev(const char *type_name, const char *name,
353                                 const char *uuid)
354 {
355         struct obd_device *newdev;
356         struct obd_type *type = NULL;
357         ENTRY;
358
359         if (strlen(name) >= MAX_OBD_NAME) {
360                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
361                 RETURN(ERR_PTR(-EINVAL));
362         }
363
364         type = class_get_type(type_name);
365         if (type == NULL){
366                 CERROR("OBD: unknown type: %s\n", type_name);
367                 RETURN(ERR_PTR(-ENODEV));
368         }
369
370         newdev = obd_device_alloc();
371         if (newdev == NULL) {
372                 class_put_type(type);
373                 RETURN(ERR_PTR(-ENOMEM));
374         }
375         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
376         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
377         newdev->obd_type = type;
378         newdev->obd_minor = -1;
379
380         rwlock_init(&newdev->obd_pool_lock);
381         newdev->obd_pool_limit = 0;
382         newdev->obd_pool_slv = 0;
383
384         INIT_LIST_HEAD(&newdev->obd_exports);
385         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
386         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
387         INIT_LIST_HEAD(&newdev->obd_exports_timed);
388         INIT_LIST_HEAD(&newdev->obd_nid_stats);
389         spin_lock_init(&newdev->obd_nid_lock);
390         spin_lock_init(&newdev->obd_dev_lock);
391         mutex_init(&newdev->obd_dev_mutex);
392         spin_lock_init(&newdev->obd_osfs_lock);
393         /* newdev->obd_osfs_age must be set to a value in the distant
394          * past to guarantee a fresh statfs is fetched on mount. */
395         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396
397         /* XXX belongs in setup not attach  */
398         init_rwsem(&newdev->obd_observer_link_sem);
399         /* recovery data */
400         spin_lock_init(&newdev->obd_recovery_task_lock);
401         init_waitqueue_head(&newdev->obd_next_transno_waitq);
402         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
403         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
405         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
406         INIT_LIST_HEAD(&newdev->obd_evict_list);
407         INIT_LIST_HEAD(&newdev->obd_lwp_list);
408
409         llog_group_init(&newdev->obd_olg);
410         /* Detach drops this */
411         atomic_set(&newdev->obd_refcount, 1);
412         lu_ref_init(&newdev->obd_reference);
413         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
414
415         newdev->obd_conn_inprogress = 0;
416
417         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
418
419         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
420                newdev->obd_name, newdev);
421
422         return newdev;
423 }
424
425 /**
426  * Free obd device.
427  *
428  * \param[in] obd obd_device to be freed
429  *
430  * \retval none
431  */
432 void class_free_dev(struct obd_device *obd)
433 {
434         struct obd_type *obd_type = obd->obd_type;
435
436         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
437                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
438         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
439                  "obd %p != obd_devs[%d] %p\n",
440                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
441         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
442                  "obd_refcount should be 0, not %d\n",
443                  atomic_read(&obd->obd_refcount));
444         LASSERT(obd_type != NULL);
445
446         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
447                obd->obd_name, obd->obd_type->typ_name);
448
449         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
450                          obd->obd_name, obd->obd_uuid.uuid);
451         if (obd->obd_stopping) {
452                 int err;
453
454                 /* If we're not stopping, we were never set up */
455                 err = obd_cleanup(obd);
456                 if (err)
457                         CERROR("Cleanup %s returned %d\n",
458                                 obd->obd_name, err);
459         }
460
461         obd_device_free(obd);
462
463         class_put_type(obd_type);
464 }
465
466 /**
467  * Unregister obd device.
468  *
469  * Free slot in obd_dev[] used by \a obd.
470  *
471  * \param[in] new_obd obd_device to be unregistered
472  *
473  * \retval none
474  */
475 void class_unregister_device(struct obd_device *obd)
476 {
477         write_lock(&obd_dev_lock);
478         if (obd->obd_minor >= 0) {
479                 LASSERT(obd_devs[obd->obd_minor] == obd);
480                 obd_devs[obd->obd_minor] = NULL;
481                 obd->obd_minor = -1;
482         }
483         write_unlock(&obd_dev_lock);
484 }
485
486 /**
487  * Register obd device.
488  *
489  * Find free slot in obd_devs[], fills it with \a new_obd.
490  *
491  * \param[in] new_obd obd_device to be registered
492  *
493  * \retval 0          success
494  * \retval -EEXIST    device with this name is registered
495  * \retval -EOVERFLOW obd_devs[] is full
496  */
497 int class_register_device(struct obd_device *new_obd)
498 {
499         int ret = 0;
500         int i;
501         int new_obd_minor = 0;
502         bool minor_assign = false;
503         bool retried = false;
504
505 again:
506         write_lock(&obd_dev_lock);
507         for (i = 0; i < class_devno_max(); i++) {
508                 struct obd_device *obd = class_num2obd(i);
509
510                 if (obd != NULL &&
511                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
512
513                         if (!retried) {
514                                 write_unlock(&obd_dev_lock);
515
516                                 /* the obd_device could be waited to be
517                                  * destroyed by the "obd_zombie_impexp_thread".
518                                  */
519                                 obd_zombie_barrier();
520                                 retried = true;
521                                 goto again;
522                         }
523
524                         CERROR("%s: already exists, won't add\n",
525                                obd->obd_name);
526                         /* in case we found a free slot before duplicate */
527                         minor_assign = false;
528                         ret = -EEXIST;
529                         break;
530                 }
531                 if (!minor_assign && obd == NULL) {
532                         new_obd_minor = i;
533                         minor_assign = true;
534                 }
535         }
536
537         if (minor_assign) {
538                 new_obd->obd_minor = new_obd_minor;
539                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
540                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
541                 obd_devs[new_obd_minor] = new_obd;
542         } else {
543                 if (ret == 0) {
544                         ret = -EOVERFLOW;
545                         CERROR("%s: all %u/%u devices used, increase "
546                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
547                                i, class_devno_max(), ret);
548                 }
549         }
550         write_unlock(&obd_dev_lock);
551
552         RETURN(ret);
553 }
554
555 static int class_name2dev_nolock(const char *name)
556 {
557         int i;
558
559         if (!name)
560                 return -1;
561
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd && strcmp(name, obd->obd_name) == 0) {
566                         /* Make sure we finished attaching before we give
567                            out any references */
568                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
569                         if (obd->obd_attached) {
570                                 return i;
571                         }
572                         break;
573                 }
574         }
575
576         return -1;
577 }
578
579 int class_name2dev(const char *name)
580 {
581         int i;
582
583         if (!name)
584                 return -1;
585
586         read_lock(&obd_dev_lock);
587         i = class_name2dev_nolock(name);
588         read_unlock(&obd_dev_lock);
589
590         return i;
591 }
592 EXPORT_SYMBOL(class_name2dev);
593
594 struct obd_device *class_name2obd(const char *name)
595 {
596         int dev = class_name2dev(name);
597
598         if (dev < 0 || dev > class_devno_max())
599                 return NULL;
600         return class_num2obd(dev);
601 }
602 EXPORT_SYMBOL(class_name2obd);
603
604 int class_uuid2dev_nolock(struct obd_uuid *uuid)
605 {
606         int i;
607
608         for (i = 0; i < class_devno_max(); i++) {
609                 struct obd_device *obd = class_num2obd(i);
610
611                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
612                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
613                         return i;
614                 }
615         }
616
617         return -1;
618 }
619
620 int class_uuid2dev(struct obd_uuid *uuid)
621 {
622         int i;
623
624         read_lock(&obd_dev_lock);
625         i = class_uuid2dev_nolock(uuid);
626         read_unlock(&obd_dev_lock);
627
628         return i;
629 }
630 EXPORT_SYMBOL(class_uuid2dev);
631
632 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
633 {
634         int dev = class_uuid2dev(uuid);
635         if (dev < 0)
636                 return NULL;
637         return class_num2obd(dev);
638 }
639 EXPORT_SYMBOL(class_uuid2obd);
640
641 /**
642  * Get obd device from ::obd_devs[]
643  *
644  * \param num [in] array index
645  *
646  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
647  *         otherwise return the obd device there.
648  */
649 struct obd_device *class_num2obd(int num)
650 {
651         struct obd_device *obd = NULL;
652
653         if (num < class_devno_max()) {
654                 obd = obd_devs[num];
655                 if (obd == NULL)
656                         return NULL;
657
658                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
659                          "%p obd_magic %08x != %08x\n",
660                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
661                 LASSERTF(obd->obd_minor == num,
662                          "%p obd_minor %0d != %0d\n",
663                          obd, obd->obd_minor, num);
664         }
665
666         return obd;
667 }
668 EXPORT_SYMBOL(class_num2obd);
669
670 /**
671  * Find obd in obd_dev[] by name or uuid.
672  *
673  * Increment obd's refcount if found.
674  *
675  * \param[in] str obd name or uuid
676  *
677  * \retval NULL    if not found
678  * \retval target  pointer to found obd_device
679  */
680 struct obd_device *class_dev_by_str(const char *str)
681 {
682         struct obd_device *target = NULL;
683         struct obd_uuid tgtuuid;
684         int rc;
685
686         obd_str2uuid(&tgtuuid, str);
687
688         read_lock(&obd_dev_lock);
689         rc = class_uuid2dev_nolock(&tgtuuid);
690         if (rc < 0)
691                 rc = class_name2dev_nolock(str);
692
693         if (rc >= 0)
694                 target = class_num2obd(rc);
695
696         if (target != NULL)
697                 class_incref(target, "find", current);
698         read_unlock(&obd_dev_lock);
699
700         RETURN(target);
701 }
702 EXPORT_SYMBOL(class_dev_by_str);
703
704 /**
705  * Get obd devices count. Device in any
706  *    state are counted
707  * \retval obd device count
708  */
709 int get_devices_count(void)
710 {
711         int index, max_index = class_devno_max(), dev_count = 0;
712
713         read_lock(&obd_dev_lock);
714         for (index = 0; index <= max_index; index++) {
715                 struct obd_device *obd = class_num2obd(index);
716                 if (obd != NULL)
717                         dev_count++;
718         }
719         read_unlock(&obd_dev_lock);
720
721         return dev_count;
722 }
723 EXPORT_SYMBOL(get_devices_count);
724
725 void class_obd_list(void)
726 {
727         char *status;
728         int i;
729
730         read_lock(&obd_dev_lock);
731         for (i = 0; i < class_devno_max(); i++) {
732                 struct obd_device *obd = class_num2obd(i);
733
734                 if (obd == NULL)
735                         continue;
736                 if (obd->obd_stopping)
737                         status = "ST";
738                 else if (obd->obd_set_up)
739                         status = "UP";
740                 else if (obd->obd_attached)
741                         status = "AT";
742                 else
743                         status = "--";
744                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
745                          i, status, obd->obd_type->typ_name,
746                          obd->obd_name, obd->obd_uuid.uuid,
747                          atomic_read(&obd->obd_refcount));
748         }
749         read_unlock(&obd_dev_lock);
750 }
751
752 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
753  * specified, then only the client with that uuid is returned,
754  * otherwise any client connected to the tgt is returned.
755  */
756 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
757                                          const char *type_name,
758                                          struct obd_uuid *grp_uuid)
759 {
760         int i;
761
762         read_lock(&obd_dev_lock);
763         for (i = 0; i < class_devno_max(); i++) {
764                 struct obd_device *obd = class_num2obd(i);
765
766                 if (obd == NULL)
767                         continue;
768                 if ((strncmp(obd->obd_type->typ_name, type_name,
769                              strlen(type_name)) == 0)) {
770                         if (obd_uuid_equals(tgt_uuid,
771                                             &obd->u.cli.cl_target_uuid) &&
772                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
773                                                          &obd->obd_uuid) : 1)) {
774                                 read_unlock(&obd_dev_lock);
775                                 return obd;
776                         }
777                 }
778         }
779         read_unlock(&obd_dev_lock);
780
781         return NULL;
782 }
783 EXPORT_SYMBOL(class_find_client_obd);
784
785 /* Iterate the obd_device list looking devices have grp_uuid. Start
786  * searching at *next, and if a device is found, the next index to look
787  * at is saved in *next. If next is NULL, then the first matching device
788  * will always be returned.
789  */
790 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
791 {
792         int i;
793
794         if (next == NULL)
795                 i = 0;
796         else if (*next >= 0 && *next < class_devno_max())
797                 i = *next;
798         else
799                 return NULL;
800
801         read_lock(&obd_dev_lock);
802         for (; i < class_devno_max(); i++) {
803                 struct obd_device *obd = class_num2obd(i);
804
805                 if (obd == NULL)
806                         continue;
807                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
808                         if (next != NULL)
809                                 *next = i+1;
810                         read_unlock(&obd_dev_lock);
811                         return obd;
812                 }
813         }
814         read_unlock(&obd_dev_lock);
815
816         return NULL;
817 }
818 EXPORT_SYMBOL(class_devices_in_group);
819
820 /**
821  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
822  * adjust sptlrpc settings accordingly.
823  */
824 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
825 {
826         struct obd_device  *obd;
827         const char         *type;
828         int                 i, rc = 0, rc2;
829
830         LASSERT(namelen > 0);
831
832         read_lock(&obd_dev_lock);
833         for (i = 0; i < class_devno_max(); i++) {
834                 obd = class_num2obd(i);
835
836                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
837                         continue;
838
839                 /* only notify mdc, osc, osp, lwp, mdt, ost
840                  * because only these have a -sptlrpc llog */
841                 type = obd->obd_type->typ_name;
842                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
844                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
846                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
847                     strcmp(type, LUSTRE_OST_NAME) != 0)
848                         continue;
849
850                 if (strncmp(obd->obd_name, fsname, namelen))
851                         continue;
852
853                 class_incref(obd, __FUNCTION__, obd);
854                 read_unlock(&obd_dev_lock);
855                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
856                                          sizeof(KEY_SPTLRPC_CONF),
857                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
858                 rc = rc ? rc : rc2;
859                 class_decref(obd, __FUNCTION__, obd);
860                 read_lock(&obd_dev_lock);
861         }
862         read_unlock(&obd_dev_lock);
863         return rc;
864 }
865 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
866
867 void obd_cleanup_caches(void)
868 {
869         ENTRY;
870         if (obd_device_cachep) {
871                 kmem_cache_destroy(obd_device_cachep);
872                 obd_device_cachep = NULL;
873         }
874
875         EXIT;
876 }
877
878 int obd_init_caches(void)
879 {
880         int rc;
881         ENTRY;
882
883         LASSERT(obd_device_cachep == NULL);
884         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
885                                 sizeof(struct obd_device),
886                                 0, 0, 0, sizeof(struct obd_device), NULL);
887         if (!obd_device_cachep)
888                 GOTO(out, rc = -ENOMEM);
889
890         RETURN(0);
891 out:
892         obd_cleanup_caches();
893         RETURN(rc);
894 }
895
896 static const char export_handle_owner[] = "export";
897
898 /* map connection to client */
899 struct obd_export *class_conn2export(struct lustre_handle *conn)
900 {
901         struct obd_export *export;
902         ENTRY;
903
904         if (!conn) {
905                 CDEBUG(D_CACHE, "looking for null handle\n");
906                 RETURN(NULL);
907         }
908
909         if (conn->cookie == -1) {  /* this means assign a new connection */
910                 CDEBUG(D_CACHE, "want a new connection\n");
911                 RETURN(NULL);
912         }
913
914         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
915         export = class_handle2object(conn->cookie, export_handle_owner);
916         RETURN(export);
917 }
918 EXPORT_SYMBOL(class_conn2export);
919
920 struct obd_device *class_exp2obd(struct obd_export *exp)
921 {
922         if (exp)
923                 return exp->exp_obd;
924         return NULL;
925 }
926 EXPORT_SYMBOL(class_exp2obd);
927
928 struct obd_import *class_exp2cliimp(struct obd_export *exp)
929 {
930         struct obd_device *obd = exp->exp_obd;
931         if (obd == NULL)
932                 return NULL;
933         return obd->u.cli.cl_import;
934 }
935 EXPORT_SYMBOL(class_exp2cliimp);
936
937 /* Export management functions */
938 static void class_export_destroy(struct obd_export *exp)
939 {
940         struct obd_device *obd = exp->exp_obd;
941         ENTRY;
942
943         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
944         LASSERT(obd != NULL);
945
946         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
947                exp->exp_client_uuid.uuid, obd->obd_name);
948
949         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
950         if (exp->exp_connection)
951                 ptlrpc_put_connection_superhack(exp->exp_connection);
952
953         LASSERT(list_empty(&exp->exp_outstanding_replies));
954         LASSERT(list_empty(&exp->exp_uncommitted_replies));
955         LASSERT(list_empty(&exp->exp_req_replay_queue));
956         LASSERT(list_empty(&exp->exp_hp_rpcs));
957         obd_destroy_export(exp);
958         /* self export doesn't hold a reference to an obd, although it
959          * exists until freeing of the obd */
960         if (exp != obd->obd_self_export)
961                 class_decref(obd, "export", exp);
962
963         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
964         kfree_rcu(exp, exp_handle.h_rcu);
965         EXIT;
966 }
967
968 struct obd_export *class_export_get(struct obd_export *exp)
969 {
970         refcount_inc(&exp->exp_handle.h_ref);
971         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
972                refcount_read(&exp->exp_handle.h_ref));
973         return exp;
974 }
975 EXPORT_SYMBOL(class_export_get);
976
977 void class_export_put(struct obd_export *exp)
978 {
979         LASSERT(exp != NULL);
980         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
981         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
982         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
983                refcount_read(&exp->exp_handle.h_ref) - 1);
984
985         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
986                 struct obd_device *obd = exp->exp_obd;
987
988                 CDEBUG(D_IOCTL, "final put %p/%s\n",
989                        exp, exp->exp_client_uuid.uuid);
990
991                 /* release nid stat refererence */
992                 lprocfs_exp_cleanup(exp);
993
994                 if (exp == obd->obd_self_export) {
995                         /* self export should be destroyed without
996                          * zombie thread as it doesn't hold a
997                          * reference to obd and doesn't hold any
998                          * resources */
999                         class_export_destroy(exp);
1000                         /* self export is destroyed, no class
1001                          * references exist and it is safe to free
1002                          * obd */
1003                         class_free_dev(obd);
1004                 } else {
1005                         LASSERT(!list_empty(&exp->exp_obd_chain));
1006                         obd_zombie_export_add(exp);
1007                 }
1008
1009         }
1010 }
1011 EXPORT_SYMBOL(class_export_put);
1012
1013 static void obd_zombie_exp_cull(struct work_struct *ws)
1014 {
1015         struct obd_export *export;
1016
1017         export = container_of(ws, struct obd_export, exp_zombie_work);
1018         class_export_destroy(export);
1019 }
1020
1021 /* Creates a new export, adds it to the hash table, and returns a
1022  * pointer to it. The refcount is 2: one for the hash reference, and
1023  * one for the pointer returned by this function. */
1024 struct obd_export *__class_new_export(struct obd_device *obd,
1025                                       struct obd_uuid *cluuid, bool is_self)
1026 {
1027         struct obd_export *export;
1028         int rc = 0;
1029         ENTRY;
1030
1031         OBD_ALLOC_PTR(export);
1032         if (!export)
1033                 return ERR_PTR(-ENOMEM);
1034
1035         export->exp_conn_cnt = 0;
1036         export->exp_lock_hash = NULL;
1037         export->exp_flock_hash = NULL;
1038         /* 2 = class_handle_hash + last */
1039         refcount_set(&export->exp_handle.h_ref, 2);
1040         atomic_set(&export->exp_rpc_count, 0);
1041         atomic_set(&export->exp_cb_count, 0);
1042         atomic_set(&export->exp_locks_count, 0);
1043 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1044         INIT_LIST_HEAD(&export->exp_locks_list);
1045         spin_lock_init(&export->exp_locks_list_guard);
1046 #endif
1047         atomic_set(&export->exp_replay_count, 0);
1048         export->exp_obd = obd;
1049         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1050         spin_lock_init(&export->exp_uncommitted_replies_lock);
1051         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1052         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1053         INIT_HLIST_NODE(&export->exp_handle.h_link);
1054         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1055         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1056         class_handle_hash(&export->exp_handle, export_handle_owner);
1057         export->exp_last_request_time = ktime_get_real_seconds();
1058         spin_lock_init(&export->exp_lock);
1059         spin_lock_init(&export->exp_rpc_lock);
1060         INIT_HLIST_NODE(&export->exp_nid_hash);
1061         INIT_HLIST_NODE(&export->exp_gen_hash);
1062         spin_lock_init(&export->exp_bl_list_lock);
1063         INIT_LIST_HEAD(&export->exp_bl_list);
1064         INIT_LIST_HEAD(&export->exp_stale_list);
1065         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1066
1067         export->exp_sp_peer = LUSTRE_SP_ANY;
1068         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1069         export->exp_client_uuid = *cluuid;
1070         obd_init_export(export);
1071
1072         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1073
1074         spin_lock(&obd->obd_dev_lock);
1075         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1076                 /* shouldn't happen, but might race */
1077                 if (obd->obd_stopping)
1078                         GOTO(exit_unlock, rc = -ENODEV);
1079
1080                 rc = obd_uuid_add(obd, export);
1081                 if (rc != 0) {
1082                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1083                                       obd->obd_name, cluuid->uuid, rc);
1084                         GOTO(exit_unlock, rc = -EALREADY);
1085                 }
1086         }
1087
1088         if (!is_self) {
1089                 class_incref(obd, "export", export);
1090                 list_add_tail(&export->exp_obd_chain_timed,
1091                               &obd->obd_exports_timed);
1092                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1093                 obd->obd_num_exports++;
1094         } else {
1095                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1096                 INIT_LIST_HEAD(&export->exp_obd_chain);
1097         }
1098         spin_unlock(&obd->obd_dev_lock);
1099         RETURN(export);
1100
1101 exit_unlock:
1102         spin_unlock(&obd->obd_dev_lock);
1103         class_handle_unhash(&export->exp_handle);
1104         obd_destroy_export(export);
1105         OBD_FREE_PTR(export);
1106         return ERR_PTR(rc);
1107 }
1108
1109 struct obd_export *class_new_export(struct obd_device *obd,
1110                                     struct obd_uuid *uuid)
1111 {
1112         return __class_new_export(obd, uuid, false);
1113 }
1114 EXPORT_SYMBOL(class_new_export);
1115
1116 struct obd_export *class_new_export_self(struct obd_device *obd,
1117                                          struct obd_uuid *uuid)
1118 {
1119         return __class_new_export(obd, uuid, true);
1120 }
1121
1122 void class_unlink_export(struct obd_export *exp)
1123 {
1124         class_handle_unhash(&exp->exp_handle);
1125
1126         if (exp->exp_obd->obd_self_export == exp) {
1127                 class_export_put(exp);
1128                 return;
1129         }
1130
1131         spin_lock(&exp->exp_obd->obd_dev_lock);
1132         /* delete an uuid-export hashitem from hashtables */
1133         if (exp != exp->exp_obd->obd_self_export)
1134                 obd_uuid_del(exp->exp_obd, exp);
1135
1136 #ifdef HAVE_SERVER_SUPPORT
1137         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1138                 struct tg_export_data   *ted = &exp->exp_target_data;
1139                 struct cfs_hash         *hash;
1140
1141                 /* Because obd_gen_hash will not be released until
1142                  * class_cleanup(), so hash should never be NULL here */
1143                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1144                 LASSERT(hash != NULL);
1145                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1146                              &exp->exp_gen_hash);
1147                 cfs_hash_putref(hash);
1148         }
1149 #endif /* HAVE_SERVER_SUPPORT */
1150
1151         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1152         list_del_init(&exp->exp_obd_chain_timed);
1153         exp->exp_obd->obd_num_exports--;
1154         spin_unlock(&exp->exp_obd->obd_dev_lock);
1155         atomic_inc(&obd_stale_export_num);
1156
1157         /* A reference is kept by obd_stale_exports list */
1158         obd_stale_export_put(exp);
1159 }
1160 EXPORT_SYMBOL(class_unlink_export);
1161
1162 /* Import management functions */
1163 static void obd_zombie_import_free(struct obd_import *imp)
1164 {
1165         ENTRY;
1166
1167         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1168                 imp->imp_obd->obd_name);
1169
1170         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1171
1172         ptlrpc_put_connection_superhack(imp->imp_connection);
1173
1174         while (!list_empty(&imp->imp_conn_list)) {
1175                 struct obd_import_conn *imp_conn;
1176
1177                 imp_conn = list_first_entry(&imp->imp_conn_list,
1178                                             struct obd_import_conn, oic_item);
1179                 list_del_init(&imp_conn->oic_item);
1180                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1181                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1182         }
1183
1184         LASSERT(imp->imp_sec == NULL);
1185         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1186                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1187         class_decref(imp->imp_obd, "import", imp);
1188         OBD_FREE_PTR(imp);
1189         EXIT;
1190 }
1191
1192 struct obd_import *class_import_get(struct obd_import *import)
1193 {
1194         refcount_inc(&import->imp_refcount);
1195         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1196                refcount_read(&import->imp_refcount),
1197                import->imp_obd->obd_name);
1198         return import;
1199 }
1200 EXPORT_SYMBOL(class_import_get);
1201
1202 void class_import_put(struct obd_import *imp)
1203 {
1204         ENTRY;
1205
1206         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1207
1208         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1209                refcount_read(&imp->imp_refcount) - 1,
1210                imp->imp_obd->obd_name);
1211
1212         if (refcount_dec_and_test(&imp->imp_refcount)) {
1213                 CDEBUG(D_INFO, "final put import %p\n", imp);
1214                 obd_zombie_import_add(imp);
1215         }
1216
1217         EXIT;
1218 }
1219 EXPORT_SYMBOL(class_import_put);
1220
1221 static void init_imp_at(struct imp_at *at) {
1222         int i;
1223         at_init(&at->iat_net_latency, 0, 0);
1224         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1225                 /* max service estimates are tracked on the server side, so
1226                    don't use the AT history here, just use the last reported
1227                    val. (But keep hist for proc histogram, worst_ever) */
1228                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1229                         AT_FLG_NOHIST);
1230         }
1231 }
1232
1233 static void obd_zombie_imp_cull(struct work_struct *ws)
1234 {
1235         struct obd_import *import;
1236
1237         import = container_of(ws, struct obd_import, imp_zombie_work);
1238         obd_zombie_import_free(import);
1239 }
1240
1241 struct obd_import *class_new_import(struct obd_device *obd)
1242 {
1243         struct obd_import *imp;
1244         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1245
1246         OBD_ALLOC(imp, sizeof(*imp));
1247         if (imp == NULL)
1248                 return NULL;
1249
1250         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1251         INIT_LIST_HEAD(&imp->imp_replay_list);
1252         INIT_LIST_HEAD(&imp->imp_sending_list);
1253         INIT_LIST_HEAD(&imp->imp_delayed_list);
1254         INIT_LIST_HEAD(&imp->imp_committed_list);
1255         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1256         imp->imp_known_replied_xid = 0;
1257         imp->imp_replay_cursor = &imp->imp_committed_list;
1258         spin_lock_init(&imp->imp_lock);
1259         imp->imp_last_success_conn = 0;
1260         imp->imp_state = LUSTRE_IMP_NEW;
1261         imp->imp_obd = class_incref(obd, "import", imp);
1262         rwlock_init(&imp->imp_sec_lock);
1263         init_waitqueue_head(&imp->imp_recovery_waitq);
1264         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1265
1266         if (curr_pid_ns && curr_pid_ns->child_reaper)
1267                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1268         else
1269                 imp->imp_sec_refpid = 1;
1270
1271         refcount_set(&imp->imp_refcount, 2);
1272         atomic_set(&imp->imp_unregistering, 0);
1273         atomic_set(&imp->imp_reqs, 0);
1274         atomic_set(&imp->imp_inflight, 0);
1275         atomic_set(&imp->imp_replay_inflight, 0);
1276         atomic_set(&imp->imp_inval_count, 0);
1277         INIT_LIST_HEAD(&imp->imp_conn_list);
1278         init_imp_at(&imp->imp_at);
1279
1280         /* the default magic is V2, will be used in connect RPC, and
1281          * then adjusted according to the flags in request/reply. */
1282         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1283
1284         return imp;
1285 }
1286 EXPORT_SYMBOL(class_new_import);
1287
1288 void class_destroy_import(struct obd_import *import)
1289 {
1290         LASSERT(import != NULL);
1291         LASSERT(import != LP_POISON);
1292
1293         spin_lock(&import->imp_lock);
1294         import->imp_generation++;
1295         spin_unlock(&import->imp_lock);
1296         class_import_put(import);
1297 }
1298 EXPORT_SYMBOL(class_destroy_import);
1299
1300 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1301
1302 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1303 {
1304         spin_lock(&exp->exp_locks_list_guard);
1305
1306         LASSERT(lock->l_exp_refs_nr >= 0);
1307
1308         if (lock->l_exp_refs_target != NULL &&
1309             lock->l_exp_refs_target != exp) {
1310                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1311                               exp, lock, lock->l_exp_refs_target);
1312         }
1313         if ((lock->l_exp_refs_nr ++) == 0) {
1314                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1315                 lock->l_exp_refs_target = exp;
1316         }
1317         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1318                lock, exp, lock->l_exp_refs_nr);
1319         spin_unlock(&exp->exp_locks_list_guard);
1320 }
1321 EXPORT_SYMBOL(__class_export_add_lock_ref);
1322
1323 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1324 {
1325         spin_lock(&exp->exp_locks_list_guard);
1326         LASSERT(lock->l_exp_refs_nr > 0);
1327         if (lock->l_exp_refs_target != exp) {
1328                 LCONSOLE_WARN("lock %p, "
1329                               "mismatching export pointers: %p, %p\n",
1330                               lock, lock->l_exp_refs_target, exp);
1331         }
1332         if (-- lock->l_exp_refs_nr == 0) {
1333                 list_del_init(&lock->l_exp_refs_link);
1334                 lock->l_exp_refs_target = NULL;
1335         }
1336         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1337                lock, exp, lock->l_exp_refs_nr);
1338         spin_unlock(&exp->exp_locks_list_guard);
1339 }
1340 EXPORT_SYMBOL(__class_export_del_lock_ref);
1341 #endif
1342
1343 /* A connection defines an export context in which preallocation can
1344    be managed. This releases the export pointer reference, and returns
1345    the export handle, so the export refcount is 1 when this function
1346    returns. */
1347 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1348                   struct obd_uuid *cluuid)
1349 {
1350         struct obd_export *export;
1351         LASSERT(conn != NULL);
1352         LASSERT(obd != NULL);
1353         LASSERT(cluuid != NULL);
1354         ENTRY;
1355
1356         export = class_new_export(obd, cluuid);
1357         if (IS_ERR(export))
1358                 RETURN(PTR_ERR(export));
1359
1360         conn->cookie = export->exp_handle.h_cookie;
1361         class_export_put(export);
1362
1363         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1364                cluuid->uuid, conn->cookie);
1365         RETURN(0);
1366 }
1367 EXPORT_SYMBOL(class_connect);
1368
1369 /* if export is involved in recovery then clean up related things */
1370 static void class_export_recovery_cleanup(struct obd_export *exp)
1371 {
1372         struct obd_device *obd = exp->exp_obd;
1373
1374         spin_lock(&obd->obd_recovery_task_lock);
1375         if (obd->obd_recovering) {
1376                 if (exp->exp_in_recovery) {
1377                         spin_lock(&exp->exp_lock);
1378                         exp->exp_in_recovery = 0;
1379                         spin_unlock(&exp->exp_lock);
1380                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1381                         atomic_dec(&obd->obd_connected_clients);
1382                 }
1383
1384                 /* if called during recovery then should update
1385                  * obd_stale_clients counter,
1386                  * lightweight exports are not counted */
1387                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1388                         exp->exp_obd->obd_stale_clients++;
1389         }
1390         spin_unlock(&obd->obd_recovery_task_lock);
1391
1392         spin_lock(&exp->exp_lock);
1393         /** Cleanup req replay fields */
1394         if (exp->exp_req_replay_needed) {
1395                 exp->exp_req_replay_needed = 0;
1396
1397                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1398                 atomic_dec(&obd->obd_req_replay_clients);
1399         }
1400
1401         /** Cleanup lock replay data */
1402         if (exp->exp_lock_replay_needed) {
1403                 exp->exp_lock_replay_needed = 0;
1404
1405                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1406                 atomic_dec(&obd->obd_lock_replay_clients);
1407         }
1408         spin_unlock(&exp->exp_lock);
1409 }
1410
1411 /* This function removes 1-3 references from the export:
1412  * 1 - for export pointer passed
1413  * and if disconnect really need
1414  * 2 - removing from hash
1415  * 3 - in client_unlink_export
1416  * The export pointer passed to this function can destroyed */
1417 int class_disconnect(struct obd_export *export)
1418 {
1419         int already_disconnected;
1420         ENTRY;
1421
1422         if (export == NULL) {
1423                 CWARN("attempting to free NULL export %p\n", export);
1424                 RETURN(-EINVAL);
1425         }
1426
1427         spin_lock(&export->exp_lock);
1428         already_disconnected = export->exp_disconnected;
1429         export->exp_disconnected = 1;
1430         /*  We hold references of export for uuid hash
1431          *  and nid_hash and export link at least. So
1432          *  it is safe to call cfs_hash_del in there.  */
1433         if (!hlist_unhashed(&export->exp_nid_hash))
1434                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1435                              &export->exp_connection->c_peer.nid,
1436                              &export->exp_nid_hash);
1437         spin_unlock(&export->exp_lock);
1438
1439         /* class_cleanup(), abort_recovery(), and class_fail_export()
1440          * all end up in here, and if any of them race we shouldn't
1441          * call extra class_export_puts(). */
1442         if (already_disconnected) {
1443                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1444                 GOTO(no_disconn, already_disconnected);
1445         }
1446
1447         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1448                export->exp_handle.h_cookie);
1449
1450         class_export_recovery_cleanup(export);
1451         class_unlink_export(export);
1452 no_disconn:
1453         class_export_put(export);
1454         RETURN(0);
1455 }
1456 EXPORT_SYMBOL(class_disconnect);
1457
1458 /* Return non-zero for a fully connected export */
1459 int class_connected_export(struct obd_export *exp)
1460 {
1461         int connected = 0;
1462
1463         if (exp) {
1464                 spin_lock(&exp->exp_lock);
1465                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1466                 spin_unlock(&exp->exp_lock);
1467         }
1468         return connected;
1469 }
1470 EXPORT_SYMBOL(class_connected_export);
1471
1472 static void class_disconnect_export_list(struct list_head *list,
1473                                          enum obd_option flags)
1474 {
1475         int rc;
1476         struct obd_export *exp;
1477         ENTRY;
1478
1479         /* It's possible that an export may disconnect itself, but
1480          * nothing else will be added to this list. */
1481         while (!list_empty(list)) {
1482                 exp = list_first_entry(list, struct obd_export,
1483                                        exp_obd_chain);
1484                 /* need for safe call CDEBUG after obd_disconnect */
1485                 class_export_get(exp);
1486
1487                 spin_lock(&exp->exp_lock);
1488                 exp->exp_flags = flags;
1489                 spin_unlock(&exp->exp_lock);
1490
1491                 if (obd_uuid_equals(&exp->exp_client_uuid,
1492                                     &exp->exp_obd->obd_uuid)) {
1493                         CDEBUG(D_HA,
1494                                "exp %p export uuid == obd uuid, don't discon\n",
1495                                exp);
1496                         /* Need to delete this now so we don't end up pointing
1497                          * to work_list later when this export is cleaned up. */
1498                         list_del_init(&exp->exp_obd_chain);
1499                         class_export_put(exp);
1500                         continue;
1501                 }
1502
1503                 class_export_get(exp);
1504                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1505                        "last request at %lld\n",
1506                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1507                        exp, exp->exp_last_request_time);
1508                 /* release one export reference anyway */
1509                 rc = obd_disconnect(exp);
1510
1511                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1512                        obd_export_nid2str(exp), exp, rc);
1513                 class_export_put(exp);
1514         }
1515         EXIT;
1516 }
1517
1518 void class_disconnect_exports(struct obd_device *obd)
1519 {
1520         LIST_HEAD(work_list);
1521         ENTRY;
1522
1523         /* Move all of the exports from obd_exports to a work list, en masse. */
1524         spin_lock(&obd->obd_dev_lock);
1525         list_splice_init(&obd->obd_exports, &work_list);
1526         list_splice_init(&obd->obd_delayed_exports, &work_list);
1527         spin_unlock(&obd->obd_dev_lock);
1528
1529         if (!list_empty(&work_list)) {
1530                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1531                        "disconnecting them\n", obd->obd_minor, obd);
1532                 class_disconnect_export_list(&work_list,
1533                                              exp_flags_from_obd(obd));
1534         } else
1535                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1536                        obd->obd_minor, obd);
1537         EXIT;
1538 }
1539 EXPORT_SYMBOL(class_disconnect_exports);
1540
1541 /* Remove exports that have not completed recovery.
1542  */
1543 void class_disconnect_stale_exports(struct obd_device *obd,
1544                                     int (*test_export)(struct obd_export *))
1545 {
1546         LIST_HEAD(work_list);
1547         struct obd_export *exp, *n;
1548         int evicted = 0;
1549         ENTRY;
1550
1551         spin_lock(&obd->obd_dev_lock);
1552         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1553                                  exp_obd_chain) {
1554                 /* don't count self-export as client */
1555                 if (obd_uuid_equals(&exp->exp_client_uuid,
1556                                     &exp->exp_obd->obd_uuid))
1557                         continue;
1558
1559                 /* don't evict clients which have no slot in last_rcvd
1560                  * (e.g. lightweight connection) */
1561                 if (exp->exp_target_data.ted_lr_idx == -1)
1562                         continue;
1563
1564                 spin_lock(&exp->exp_lock);
1565                 if (exp->exp_failed || test_export(exp)) {
1566                         spin_unlock(&exp->exp_lock);
1567                         continue;
1568                 }
1569                 exp->exp_failed = 1;
1570                 spin_unlock(&exp->exp_lock);
1571
1572                 list_move(&exp->exp_obd_chain, &work_list);
1573                 evicted++;
1574                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1575                        obd->obd_name, exp->exp_client_uuid.uuid,
1576                        obd_export_nid2str(exp));
1577                 print_export_data(exp, "EVICTING", 0, D_HA);
1578         }
1579         spin_unlock(&obd->obd_dev_lock);
1580
1581         if (evicted)
1582                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1583                               obd->obd_name, evicted);
1584
1585         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1586                                                  OBD_OPT_ABORT_RECOV);
1587         EXIT;
1588 }
1589 EXPORT_SYMBOL(class_disconnect_stale_exports);
1590
1591 void class_fail_export(struct obd_export *exp)
1592 {
1593         int rc, already_failed;
1594
1595         spin_lock(&exp->exp_lock);
1596         already_failed = exp->exp_failed;
1597         exp->exp_failed = 1;
1598         spin_unlock(&exp->exp_lock);
1599
1600         if (already_failed) {
1601                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1602                        exp, exp->exp_client_uuid.uuid);
1603                 return;
1604         }
1605
1606         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1607                exp, exp->exp_client_uuid.uuid);
1608
1609         if (obd_dump_on_timeout)
1610                 libcfs_debug_dumplog();
1611
1612         /* need for safe call CDEBUG after obd_disconnect */
1613         class_export_get(exp);
1614
1615         /* Most callers into obd_disconnect are removing their own reference
1616          * (request, for example) in addition to the one from the hash table.
1617          * We don't have such a reference here, so make one. */
1618         class_export_get(exp);
1619         rc = obd_disconnect(exp);
1620         if (rc)
1621                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1622         else
1623                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1624                        exp, exp->exp_client_uuid.uuid);
1625         class_export_put(exp);
1626 }
1627 EXPORT_SYMBOL(class_fail_export);
1628
1629 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1630 {
1631         struct cfs_hash *nid_hash;
1632         struct obd_export *doomed_exp = NULL;
1633         int exports_evicted = 0;
1634
1635         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1636
1637         spin_lock(&obd->obd_dev_lock);
1638         /* umount has run already, so evict thread should leave
1639          * its task to umount thread now */
1640         if (obd->obd_stopping) {
1641                 spin_unlock(&obd->obd_dev_lock);
1642                 return exports_evicted;
1643         }
1644         nid_hash = obd->obd_nid_hash;
1645         cfs_hash_getref(nid_hash);
1646         spin_unlock(&obd->obd_dev_lock);
1647
1648         do {
1649                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1650                 if (doomed_exp == NULL)
1651                         break;
1652
1653                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1654                          "nid %s found, wanted nid %s, requested nid %s\n",
1655                          obd_export_nid2str(doomed_exp),
1656                          libcfs_nid2str(nid_key), nid);
1657                 LASSERTF(doomed_exp != obd->obd_self_export,
1658                          "self-export is hashed by NID?\n");
1659                 exports_evicted++;
1660                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1661                               "request\n", obd->obd_name,
1662                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1663                               obd_export_nid2str(doomed_exp));
1664                 class_fail_export(doomed_exp);
1665                 class_export_put(doomed_exp);
1666         } while (1);
1667
1668         cfs_hash_putref(nid_hash);
1669
1670         if (!exports_evicted)
1671                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1672                        obd->obd_name, nid);
1673         return exports_evicted;
1674 }
1675 EXPORT_SYMBOL(obd_export_evict_by_nid);
1676
1677 #ifdef HAVE_SERVER_SUPPORT
1678 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1679 {
1680         struct obd_export *doomed_exp = NULL;
1681         struct obd_uuid doomed_uuid;
1682         int exports_evicted = 0;
1683
1684         spin_lock(&obd->obd_dev_lock);
1685         if (obd->obd_stopping) {
1686                 spin_unlock(&obd->obd_dev_lock);
1687                 return exports_evicted;
1688         }
1689         spin_unlock(&obd->obd_dev_lock);
1690
1691         obd_str2uuid(&doomed_uuid, uuid);
1692         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1693                 CERROR("%s: can't evict myself\n", obd->obd_name);
1694                 return exports_evicted;
1695         }
1696
1697         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1698         if (doomed_exp == NULL) {
1699                 CERROR("%s: can't disconnect %s: no exports found\n",
1700                        obd->obd_name, uuid);
1701         } else {
1702                 CWARN("%s: evicting %s at adminstrative request\n",
1703                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1704                 class_fail_export(doomed_exp);
1705                 class_export_put(doomed_exp);
1706                 obd_uuid_del(obd, doomed_exp);
1707                 exports_evicted++;
1708         }
1709
1710         return exports_evicted;
1711 }
1712 #endif /* HAVE_SERVER_SUPPORT */
1713
1714 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1715 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1716 EXPORT_SYMBOL(class_export_dump_hook);
1717 #endif
1718
1719 static void print_export_data(struct obd_export *exp, const char *status,
1720                               int locks, int debug_level)
1721 {
1722         struct ptlrpc_reply_state *rs;
1723         struct ptlrpc_reply_state *first_reply = NULL;
1724         int nreplies = 0;
1725
1726         spin_lock(&exp->exp_lock);
1727         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1728                             rs_exp_list) {
1729                 if (nreplies == 0)
1730                         first_reply = rs;
1731                 nreplies++;
1732         }
1733         spin_unlock(&exp->exp_lock);
1734
1735         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1736                "%p %s %llu stale:%d\n",
1737                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1738                obd_export_nid2str(exp),
1739                refcount_read(&exp->exp_handle.h_ref),
1740                atomic_read(&exp->exp_rpc_count),
1741                atomic_read(&exp->exp_cb_count),
1742                atomic_read(&exp->exp_locks_count),
1743                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1744                nreplies, first_reply, nreplies > 3 ? "..." : "",
1745                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1746 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1747         if (locks && class_export_dump_hook != NULL)
1748                 class_export_dump_hook(exp);
1749 #endif
1750 }
1751
1752 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1753 {
1754         struct obd_export *exp;
1755
1756         spin_lock(&obd->obd_dev_lock);
1757         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1758                 print_export_data(exp, "ACTIVE", locks, debug_level);
1759         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1760                 print_export_data(exp, "UNLINKED", locks, debug_level);
1761         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1762                 print_export_data(exp, "DELAYED", locks, debug_level);
1763         spin_unlock(&obd->obd_dev_lock);
1764 }
1765
1766 void obd_exports_barrier(struct obd_device *obd)
1767 {
1768         int waited = 2;
1769         LASSERT(list_empty(&obd->obd_exports));
1770         spin_lock(&obd->obd_dev_lock);
1771         while (!list_empty(&obd->obd_unlinked_exports)) {
1772                 spin_unlock(&obd->obd_dev_lock);
1773                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1774                 if (waited > 5 && is_power_of_2(waited)) {
1775                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1776                                       "more than %d seconds. "
1777                                       "The obd refcount = %d. Is it stuck?\n",
1778                                       obd->obd_name, waited,
1779                                       atomic_read(&obd->obd_refcount));
1780                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1781                 }
1782                 waited *= 2;
1783                 spin_lock(&obd->obd_dev_lock);
1784         }
1785         spin_unlock(&obd->obd_dev_lock);
1786 }
1787 EXPORT_SYMBOL(obd_exports_barrier);
1788
1789 /**
1790  * Add export to the obd_zombe thread and notify it.
1791  */
1792 static void obd_zombie_export_add(struct obd_export *exp) {
1793         atomic_dec(&obd_stale_export_num);
1794         spin_lock(&exp->exp_obd->obd_dev_lock);
1795         LASSERT(!list_empty(&exp->exp_obd_chain));
1796         list_del_init(&exp->exp_obd_chain);
1797         spin_unlock(&exp->exp_obd->obd_dev_lock);
1798
1799         queue_work(zombie_wq, &exp->exp_zombie_work);
1800 }
1801
1802 /**
1803  * Add import to the obd_zombe thread and notify it.
1804  */
1805 static void obd_zombie_import_add(struct obd_import *imp) {
1806         LASSERT(imp->imp_sec == NULL);
1807
1808         queue_work(zombie_wq, &imp->imp_zombie_work);
1809 }
1810
1811 /**
1812  * wait when obd_zombie import/export queues become empty
1813  */
1814 void obd_zombie_barrier(void)
1815 {
1816         flush_workqueue(zombie_wq);
1817 }
1818 EXPORT_SYMBOL(obd_zombie_barrier);
1819
1820
1821 struct obd_export *obd_stale_export_get(void)
1822 {
1823         struct obd_export *exp = NULL;
1824         ENTRY;
1825
1826         spin_lock(&obd_stale_export_lock);
1827         if (!list_empty(&obd_stale_exports)) {
1828                 exp = list_first_entry(&obd_stale_exports,
1829                                        struct obd_export, exp_stale_list);
1830                 list_del_init(&exp->exp_stale_list);
1831         }
1832         spin_unlock(&obd_stale_export_lock);
1833
1834         if (exp) {
1835                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1836                        atomic_read(&obd_stale_export_num));
1837         }
1838         RETURN(exp);
1839 }
1840 EXPORT_SYMBOL(obd_stale_export_get);
1841
1842 void obd_stale_export_put(struct obd_export *exp)
1843 {
1844         ENTRY;
1845
1846         LASSERT(list_empty(&exp->exp_stale_list));
1847         if (exp->exp_lock_hash &&
1848             atomic_read(&exp->exp_lock_hash->hs_count)) {
1849                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1850                        atomic_read(&obd_stale_export_num));
1851
1852                 spin_lock_bh(&exp->exp_bl_list_lock);
1853                 spin_lock(&obd_stale_export_lock);
1854                 /* Add to the tail if there is no blocked locks,
1855                  * to the head otherwise. */
1856                 if (list_empty(&exp->exp_bl_list))
1857                         list_add_tail(&exp->exp_stale_list,
1858                                       &obd_stale_exports);
1859                 else
1860                         list_add(&exp->exp_stale_list,
1861                                  &obd_stale_exports);
1862
1863                 spin_unlock(&obd_stale_export_lock);
1864                 spin_unlock_bh(&exp->exp_bl_list_lock);
1865         } else {
1866                 class_export_put(exp);
1867         }
1868         EXIT;
1869 }
1870 EXPORT_SYMBOL(obd_stale_export_put);
1871
1872 /**
1873  * Adjust the position of the export in the stale list,
1874  * i.e. move to the head of the list if is needed.
1875  **/
1876 void obd_stale_export_adjust(struct obd_export *exp)
1877 {
1878         LASSERT(exp != NULL);
1879         spin_lock_bh(&exp->exp_bl_list_lock);
1880         spin_lock(&obd_stale_export_lock);
1881
1882         if (!list_empty(&exp->exp_stale_list) &&
1883             !list_empty(&exp->exp_bl_list))
1884                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1885
1886         spin_unlock(&obd_stale_export_lock);
1887         spin_unlock_bh(&exp->exp_bl_list_lock);
1888 }
1889 EXPORT_SYMBOL(obd_stale_export_adjust);
1890
1891 /**
1892  * start destroy zombie import/export thread
1893  */
1894 int obd_zombie_impexp_init(void)
1895 {
1896         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1897                                            0, CFS_CPT_ANY,
1898                                            cfs_cpt_number(cfs_cpt_tab));
1899
1900         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1901 }
1902
1903 /**
1904  * stop destroy zombie import/export thread
1905  */
1906 void obd_zombie_impexp_stop(void)
1907 {
1908         destroy_workqueue(zombie_wq);
1909         LASSERT(list_empty(&obd_stale_exports));
1910 }
1911
1912 /***** Kernel-userspace comm helpers *******/
1913
1914 /* Get length of entire message, including header */
1915 int kuc_len(int payload_len)
1916 {
1917         return sizeof(struct kuc_hdr) + payload_len;
1918 }
1919 EXPORT_SYMBOL(kuc_len);
1920
1921 /* Get a pointer to kuc header, given a ptr to the payload
1922  * @param p Pointer to payload area
1923  * @returns Pointer to kuc header
1924  */
1925 struct kuc_hdr * kuc_ptr(void *p)
1926 {
1927         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1928         LASSERT(lh->kuc_magic == KUC_MAGIC);
1929         return lh;
1930 }
1931 EXPORT_SYMBOL(kuc_ptr);
1932
1933 /* Alloc space for a message, and fill in header
1934  * @return Pointer to payload area
1935  */
1936 void *kuc_alloc(int payload_len, int transport, int type)
1937 {
1938         struct kuc_hdr *lh;
1939         int len = kuc_len(payload_len);
1940
1941         OBD_ALLOC(lh, len);
1942         if (lh == NULL)
1943                 return ERR_PTR(-ENOMEM);
1944
1945         lh->kuc_magic = KUC_MAGIC;
1946         lh->kuc_transport = transport;
1947         lh->kuc_msgtype = type;
1948         lh->kuc_msglen = len;
1949
1950         return (void *)(lh + 1);
1951 }
1952 EXPORT_SYMBOL(kuc_alloc);
1953
1954 /* Takes pointer to payload area */
1955 void kuc_free(void *p, int payload_len)
1956 {
1957         struct kuc_hdr *lh = kuc_ptr(p);
1958         OBD_FREE(lh, kuc_len(payload_len));
1959 }
1960 EXPORT_SYMBOL(kuc_free);
1961
1962 struct obd_request_slot_waiter {
1963         struct list_head        orsw_entry;
1964         wait_queue_head_t       orsw_waitq;
1965         bool                    orsw_signaled;
1966 };
1967
1968 static bool obd_request_slot_avail(struct client_obd *cli,
1969                                    struct obd_request_slot_waiter *orsw)
1970 {
1971         bool avail;
1972
1973         spin_lock(&cli->cl_loi_list_lock);
1974         avail = !!list_empty(&orsw->orsw_entry);
1975         spin_unlock(&cli->cl_loi_list_lock);
1976
1977         return avail;
1978 };
1979
1980 /*
1981  * For network flow control, the RPC sponsor needs to acquire a credit
1982  * before sending the RPC. The credits count for a connection is defined
1983  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1984  * the subsequent RPC sponsors need to wait until others released their
1985  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1986  */
1987 int obd_get_request_slot(struct client_obd *cli)
1988 {
1989         struct obd_request_slot_waiter   orsw;
1990         int                              rc;
1991
1992         spin_lock(&cli->cl_loi_list_lock);
1993         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1994                 cli->cl_rpcs_in_flight++;
1995                 spin_unlock(&cli->cl_loi_list_lock);
1996                 return 0;
1997         }
1998
1999         init_waitqueue_head(&orsw.orsw_waitq);
2000         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2001         orsw.orsw_signaled = false;
2002         spin_unlock(&cli->cl_loi_list_lock);
2003
2004         rc = l_wait_event_abortable(orsw.orsw_waitq,
2005                                     obd_request_slot_avail(cli, &orsw) ||
2006                                     orsw.orsw_signaled);
2007
2008         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2009          * freed but other (such as obd_put_request_slot) is using it. */
2010         spin_lock(&cli->cl_loi_list_lock);
2011         if (rc != 0) {
2012                 if (!orsw.orsw_signaled) {
2013                         if (list_empty(&orsw.orsw_entry))
2014                                 cli->cl_rpcs_in_flight--;
2015                         else
2016                                 list_del(&orsw.orsw_entry);
2017                 }
2018                 rc = -EINTR;
2019         }
2020
2021         if (orsw.orsw_signaled) {
2022                 LASSERT(list_empty(&orsw.orsw_entry));
2023
2024                 rc = -EINTR;
2025         }
2026         spin_unlock(&cli->cl_loi_list_lock);
2027
2028         return rc;
2029 }
2030 EXPORT_SYMBOL(obd_get_request_slot);
2031
2032 void obd_put_request_slot(struct client_obd *cli)
2033 {
2034         struct obd_request_slot_waiter *orsw;
2035
2036         spin_lock(&cli->cl_loi_list_lock);
2037         cli->cl_rpcs_in_flight--;
2038
2039         /* If there is free slot, wakeup the first waiter. */
2040         if (!list_empty(&cli->cl_flight_waiters) &&
2041             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2042                 orsw = list_first_entry(&cli->cl_flight_waiters,
2043                                         struct obd_request_slot_waiter,
2044                                         orsw_entry);
2045                 list_del_init(&orsw->orsw_entry);
2046                 cli->cl_rpcs_in_flight++;
2047                 wake_up(&orsw->orsw_waitq);
2048         }
2049         spin_unlock(&cli->cl_loi_list_lock);
2050 }
2051 EXPORT_SYMBOL(obd_put_request_slot);
2052
2053 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2054 {
2055         return cli->cl_max_rpcs_in_flight;
2056 }
2057 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2058
2059 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2060 {
2061         struct obd_request_slot_waiter *orsw;
2062         __u32                           old;
2063         int                             diff;
2064         int                             i;
2065         int                             rc;
2066
2067         if (max > OBD_MAX_RIF_MAX || max < 1)
2068                 return -ERANGE;
2069
2070         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2071                cli->cl_import->imp_obd->obd_name, max,
2072                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2073
2074         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2075                    LUSTRE_MDC_NAME) == 0) {
2076                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2077                  * strictly lower that max_rpcs_in_flight */
2078                 if (max < 2) {
2079                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2080                                cli->cl_import->imp_obd->obd_name);
2081                         return -ERANGE;
2082                 }
2083                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2084                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2085                         if (rc != 0)
2086                                 return rc;
2087                 }
2088         }
2089
2090         spin_lock(&cli->cl_loi_list_lock);
2091         old = cli->cl_max_rpcs_in_flight;
2092         cli->cl_max_rpcs_in_flight = max;
2093         client_adjust_max_dirty(cli);
2094
2095         diff = max - old;
2096
2097         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2098         for (i = 0; i < diff; i++) {
2099                 if (list_empty(&cli->cl_flight_waiters))
2100                         break;
2101
2102                 orsw = list_first_entry(&cli->cl_flight_waiters,
2103                                         struct obd_request_slot_waiter,
2104                                         orsw_entry);
2105                 list_del_init(&orsw->orsw_entry);
2106                 cli->cl_rpcs_in_flight++;
2107                 wake_up(&orsw->orsw_waitq);
2108         }
2109         spin_unlock(&cli->cl_loi_list_lock);
2110
2111         return 0;
2112 }
2113 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2114
2115 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2116 {
2117         return cli->cl_max_mod_rpcs_in_flight;
2118 }
2119 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2120
2121 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2122 {
2123         struct obd_connect_data *ocd;
2124         __u16 maxmodrpcs;
2125         __u16 prev;
2126
2127         if (max > OBD_MAX_RIF_MAX || max < 1)
2128                 return -ERANGE;
2129
2130         ocd = &cli->cl_import->imp_connect_data;
2131         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2132                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2133                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2134
2135         if (max == OBD_MAX_RIF_MAX)
2136                 max = OBD_MAX_RIF_MAX - 1;
2137
2138         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2139          * increase this value, also bump up max_rpcs_in_flight to match.
2140          */
2141         if (max >= cli->cl_max_rpcs_in_flight) {
2142                 CDEBUG(D_INFO,
2143                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2144                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2145                 obd_set_max_rpcs_in_flight(cli, max + 1);
2146         }
2147
2148         /* cannot exceed max modify RPCs in flight supported by the server,
2149          * but verify ocd_connect_flags is at least initialized first.  If
2150          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2151          */
2152         if (!ocd->ocd_connect_flags) {
2153                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2154         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2155                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2156                 if (maxmodrpcs == 0) { /* connection not finished yet */
2157                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2158                         CDEBUG(D_INFO,
2159                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2160                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2161                 }
2162         } else {
2163                 maxmodrpcs = 1;
2164         }
2165         if (max > maxmodrpcs) {
2166                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2167                        cli->cl_import->imp_obd->obd_name,
2168                        max, maxmodrpcs);
2169                 return -ERANGE;
2170         }
2171
2172         spin_lock(&cli->cl_mod_rpcs_lock);
2173
2174         prev = cli->cl_max_mod_rpcs_in_flight;
2175         cli->cl_max_mod_rpcs_in_flight = max;
2176
2177         /* wakeup waiters if limit has been increased */
2178         if (cli->cl_max_mod_rpcs_in_flight > prev)
2179                 wake_up(&cli->cl_mod_rpcs_waitq);
2180
2181         spin_unlock(&cli->cl_mod_rpcs_lock);
2182
2183         return 0;
2184 }
2185 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2186
2187 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2188                                struct seq_file *seq)
2189 {
2190         unsigned long mod_tot = 0, mod_cum;
2191         struct timespec64 now;
2192         int i;
2193
2194         ktime_get_real_ts64(&now);
2195
2196         spin_lock(&cli->cl_mod_rpcs_lock);
2197
2198         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2199                    (s64)now.tv_sec, now.tv_nsec);
2200         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2201                    cli->cl_mod_rpcs_in_flight);
2202
2203         seq_printf(seq, "\n\t\t\tmodify\n");
2204         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2205
2206         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2207
2208         mod_cum = 0;
2209         for (i = 0; i < OBD_HIST_MAX; i++) {
2210                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2211                 mod_cum += mod;
2212                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2213                            i, mod, pct(mod, mod_tot),
2214                            pct(mod_cum, mod_tot));
2215                 if (mod_cum == mod_tot)
2216                         break;
2217         }
2218
2219         spin_unlock(&cli->cl_mod_rpcs_lock);
2220
2221         return 0;
2222 }
2223 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2224
2225 /* The number of modify RPCs sent in parallel is limited
2226  * because the server has a finite number of slots per client to
2227  * store request result and ensure reply reconstruction when needed.
2228  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2229  * that takes into account server limit and cl_max_rpcs_in_flight
2230  * value.
2231  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2232  * one close request is allowed above the maximum.
2233  */
2234 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2235                                                  bool close_req)
2236 {
2237         bool avail;
2238
2239         /* A slot is available if
2240          * - number of modify RPCs in flight is less than the max
2241          * - it's a close RPC and no other close request is in flight
2242          */
2243         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2244                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2245
2246         return avail;
2247 }
2248
2249 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2250                                          bool close_req)
2251 {
2252         bool avail;
2253
2254         spin_lock(&cli->cl_mod_rpcs_lock);
2255         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2256         spin_unlock(&cli->cl_mod_rpcs_lock);
2257         return avail;
2258 }
2259
2260
2261 /* Get a modify RPC slot from the obd client @cli according
2262  * to the kind of operation @opc that is going to be sent
2263  * and the intent @it of the operation if it applies.
2264  * If the maximum number of modify RPCs in flight is reached
2265  * the thread is put to sleep.
2266  * Returns the tag to be set in the request message. Tag 0
2267  * is reserved for non-modifying requests.
2268  */
2269 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2270 {
2271         bool                    close_req = false;
2272         __u16                   i, max;
2273
2274         if (opc == MDS_CLOSE)
2275                 close_req = true;
2276
2277         do {
2278                 spin_lock(&cli->cl_mod_rpcs_lock);
2279                 max = cli->cl_max_mod_rpcs_in_flight;
2280                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2281                         /* there is a slot available */
2282                         cli->cl_mod_rpcs_in_flight++;
2283                         if (close_req)
2284                                 cli->cl_close_rpcs_in_flight++;
2285                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2286                                          cli->cl_mod_rpcs_in_flight);
2287                         /* find a free tag */
2288                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2289                                                 max + 1);
2290                         LASSERT(i < OBD_MAX_RIF_MAX);
2291                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2292                         spin_unlock(&cli->cl_mod_rpcs_lock);
2293                         /* tag 0 is reserved for non-modify RPCs */
2294
2295                         CDEBUG(D_RPCTRACE,
2296                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2297                                cli->cl_import->imp_obd->obd_name,
2298                                i + 1, opc, max);
2299
2300                         return i + 1;
2301                 }
2302                 spin_unlock(&cli->cl_mod_rpcs_lock);
2303
2304                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2305                        "opc %u, max %hu\n",
2306                        cli->cl_import->imp_obd->obd_name, opc, max);
2307
2308                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2309                                           obd_mod_rpc_slot_avail(cli,
2310                                                                  close_req));
2311         } while (true);
2312 }
2313 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2314
2315 /* Put a modify RPC slot from the obd client @cli according
2316  * to the kind of operation @opc that has been sent.
2317  */
2318 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2319 {
2320         bool                    close_req = false;
2321
2322         if (tag == 0)
2323                 return;
2324
2325         if (opc == MDS_CLOSE)
2326                 close_req = true;
2327
2328         spin_lock(&cli->cl_mod_rpcs_lock);
2329         cli->cl_mod_rpcs_in_flight--;
2330         if (close_req)
2331                 cli->cl_close_rpcs_in_flight--;
2332         /* release the tag in the bitmap */
2333         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2334         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2335         spin_unlock(&cli->cl_mod_rpcs_lock);
2336         wake_up(&cli->cl_mod_rpcs_waitq);
2337 }
2338 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2339