Whamcloud - gitweb
LU-7420 echo: fix echo server to work with unified target
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdecho/echo.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Andreas Dilger <adilger@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_ECHO
39
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <lustre_debug.h>
43 #include <lustre_dlm.h>
44 #include <lprocfs_status.h>
45
46 #include "echo_internal.h"
47
48 /* The echo objid needs to be below 2^32, because regular FID numbers are
49  * limited to 2^32 objects in f_oid for the FID_SEQ_ECHO range. b=23335 */
50 #define ECHO_INIT_OID        0x10000000ULL
51 #define ECHO_HANDLE_MAGIC    0xabcd0123fedc9876ULL
52
53 #define ECHO_PERSISTENT_PAGES (ECHO_PERSISTENT_SIZE >> PAGE_SHIFT)
54 static struct page *echo_persistent_pages[ECHO_PERSISTENT_PAGES];
55
56 enum {
57         LPROC_ECHO_READ_BYTES = 1,
58         LPROC_ECHO_WRITE_BYTES = 2,
59         LPROC_ECHO_LAST = LPROC_ECHO_WRITE_BYTES +1
60 };
61
62 struct echo_srv_device {
63         struct lu_device esd_dev;
64         struct lu_target esd_lut;
65 };
66
67 static inline struct echo_srv_device *echo_srv_dev(struct lu_device *d)
68 {
69         return container_of0(d, struct echo_srv_device, esd_dev);
70 }
71
72 static inline struct obd_device *echo_srv_obd(struct echo_srv_device *esd)
73 {
74         return esd->esd_dev.ld_obd;
75 }
76
77 static int echo_connect(const struct lu_env *env,
78                         struct obd_export **exp, struct obd_device *obd,
79                         struct obd_uuid *cluuid, struct obd_connect_data *data,
80                         void *localdata)
81 {
82         struct lustre_handle conn = { 0 };
83         int rc;
84
85         data->ocd_connect_flags &= ECHO_CONNECT_SUPPORTED;
86
87         if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
88                 data->ocd_connect_flags2 &= ECHO_CONNECT_SUPPORTED2;
89
90         rc = class_connect(&conn, obd, cluuid);
91         if (rc) {
92                 CERROR("can't connect %d\n", rc);
93                 return rc;
94         }
95         *exp = class_conn2export(&conn);
96
97         return 0;
98 }
99
100 static int echo_disconnect(struct obd_export *exp)
101 {
102         LASSERT (exp != NULL);
103
104         return server_disconnect_export(exp);
105 }
106
107 static int echo_init_export(struct obd_export *exp)
108 {
109         return ldlm_init_export(exp);
110 }
111
112 static int echo_destroy_export(struct obd_export *exp)
113 {
114         ENTRY;
115
116         target_destroy_export(exp);
117         ldlm_destroy_export(exp);
118
119         RETURN(0);
120 }
121
122 static u64 echo_next_id(struct obd_device *obddev)
123 {
124         u64 id;
125
126         spin_lock(&obddev->u.echo.eo_lock);
127         id = ++obddev->u.echo.eo_lastino;
128         spin_unlock(&obddev->u.echo.eo_lock);
129
130         return id;
131 }
132
133 static void
134 echo_page_debug_setup(struct page *page, int rw, u64 id,
135                       __u64 offset, int len)
136 {
137         int   page_offset = offset & ~PAGE_MASK;
138         char *addr        = ((char *)kmap(page)) + page_offset;
139
140         if (len % OBD_ECHO_BLOCK_SIZE != 0)
141                 CERROR("Unexpected block size %d\n", len);
142
143         while (len > 0) {
144                 if (rw & OBD_BRW_READ)
145                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
146                                           offset, id);
147                 else
148                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
149                                           0xecc0ecc0ecc0ecc0ULL,
150                                           0xecc0ecc0ecc0ecc0ULL);
151
152                 addr   += OBD_ECHO_BLOCK_SIZE;
153                 offset += OBD_ECHO_BLOCK_SIZE;
154                 len    -= OBD_ECHO_BLOCK_SIZE;
155         }
156
157         kunmap(page);
158 }
159
160 static int
161 echo_page_debug_check(struct page *page, u64 id,
162                       __u64 offset, int len)
163 {
164         int   page_offset = offset & ~PAGE_MASK;
165         char *addr        = ((char *)kmap(page)) + page_offset;
166         int   rc          = 0;
167         int   rc2;
168
169         if (len % OBD_ECHO_BLOCK_SIZE != 0)
170                 CERROR("Unexpected block size %d\n", len);
171
172         while (len > 0) {
173                 rc2 = block_debug_check("echo", addr, OBD_ECHO_BLOCK_SIZE,
174                                         offset, id);
175
176                 if (rc2 != 0 && rc == 0)
177                         rc = rc2;
178
179                 addr   += OBD_ECHO_BLOCK_SIZE;
180                 offset += OBD_ECHO_BLOCK_SIZE;
181                 len    -= OBD_ECHO_BLOCK_SIZE;
182         }
183
184         kunmap(page);
185
186         return rc;
187 }
188
189 static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
190                              struct niobuf_remote *nb, int *pages,
191                              struct niobuf_local *lb, int cmd, int *left)
192 {
193         gfp_t gfp_mask = (ostid_id(&obj->ioo_oid) & 1) ?
194                         GFP_HIGHUSER : GFP_KERNEL;
195         int ispersistent = ostid_id(&obj->ioo_oid) == ECHO_PERSISTENT_OBJID;
196         int debug_setup = (!ispersistent &&
197                            (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
198                            (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
199         struct niobuf_local *res = lb;
200         u64 offset = nb->rnb_offset;
201         int len = nb->rnb_len;
202
203         while (len > 0) {
204                 int plen = PAGE_SIZE - (offset & (PAGE_SIZE-1));
205                 if (len < plen)
206                         plen = len;
207
208                 /* check for local buf overflow */
209                 if (*left == 0)
210                         return -EINVAL;
211
212                 res->lnb_file_offset = offset;
213                 res->lnb_len = plen;
214                 LASSERT((res->lnb_file_offset & ~PAGE_MASK) +
215                         res->lnb_len <= PAGE_SIZE);
216
217                 if (ispersistent &&
218                     ((res->lnb_file_offset >> PAGE_SHIFT) <
219                       ECHO_PERSISTENT_PAGES)) {
220                         res->lnb_page =
221                                 echo_persistent_pages[res->lnb_file_offset >>
222                                                       PAGE_SHIFT];
223                         /* Take extra ref so __free_pages() can be called OK */
224                         get_page(res->lnb_page);
225                 } else {
226                         res->lnb_page = alloc_page(gfp_mask);
227                         if (res->lnb_page == NULL) {
228                                 CERROR("can't get page for id " DOSTID"\n",
229                                        POSTID(&obj->ioo_oid));
230                                 return -ENOMEM;
231                         }
232                 }
233
234                 CDEBUG(D_PAGE, "$$$$ get page %p @ %llu for %d\n",
235                        res->lnb_page, res->lnb_file_offset, res->lnb_len);
236
237                 if (cmd & OBD_BRW_READ)
238                         res->lnb_rc = res->lnb_len;
239
240                 if (debug_setup)
241                         echo_page_debug_setup(res->lnb_page, cmd,
242                                               ostid_id(&obj->ioo_oid),
243                                               res->lnb_file_offset,
244                                               res->lnb_len);
245
246                 offset += plen;
247                 len -= plen;
248                 res++;
249
250                 (*left)--;
251                 (*pages)++;
252         }
253
254         return 0;
255 }
256
257 static int echo_finalize_lb(struct obdo *oa, struct obd_ioobj *obj,
258                             struct niobuf_remote *rb, int *pgs,
259                             struct niobuf_local *lb, int verify)
260 {
261         struct niobuf_local *res = lb;
262         u64 start = rb->rnb_offset >> PAGE_SHIFT;
263         u64 end   = (rb->rnb_offset + rb->rnb_len + PAGE_SIZE - 1) >>
264                     PAGE_SHIFT;
265         int     count  = (int)(end - start);
266         int     rc     = 0;
267         int     i;
268
269         for (i = 0; i < count; i++, (*pgs) ++, res++) {
270                 struct page *page = res->lnb_page;
271                 void       *addr;
272
273                 if (page == NULL) {
274                         CERROR("null page objid %llu:%p, buf %d/%d\n",
275                                ostid_id(&obj->ioo_oid), page, i,
276                                obj->ioo_bufcnt);
277                         return -EFAULT;
278                 }
279
280                 addr = kmap(page);
281
282                 CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@%llu\n",
283                        res->lnb_page, addr, res->lnb_file_offset);
284
285                 if (verify) {
286                         int vrc = echo_page_debug_check(page,
287                                                         ostid_id(&obj->ioo_oid),
288                                                         res->lnb_file_offset,
289                                                         res->lnb_len);
290                         /* check all the pages always */
291                         if (vrc != 0 && rc == 0)
292                                 rc = vrc;
293                 }
294
295                 kunmap(page);
296                 /* NB see comment above regarding persistent pages */
297                 __free_page(page);
298         }
299
300         return rc;
301 }
302
303 static int echo_preprw(const struct lu_env *env, int cmd,
304                        struct obd_export *export, struct obdo *oa,
305                        int objcount, struct obd_ioobj *obj,
306                        struct niobuf_remote *nb, int *pages,
307                        struct niobuf_local *res)
308 {
309         struct obd_device *obd;
310         int tot_bytes = 0;
311         int rc = 0;
312         int i, left;
313         ENTRY;
314
315         obd = export->exp_obd;
316         if (obd == NULL)
317                 RETURN(-EINVAL);
318
319         /* Temp fix to stop falling foul of osc_announce_cached() */
320         oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
321
322         memset(res, 0, sizeof(*res) * *pages);
323
324         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
325                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
326
327         left = *pages;
328         *pages = 0;
329
330         for (i = 0; i < objcount; i++, obj++) {
331                 int j;
332
333                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++) {
334
335                         rc = echo_map_nb_to_lb(oa, obj, nb, pages,
336                                                res + *pages, cmd, &left);
337                         if (rc)
338                                 GOTO(preprw_cleanup, rc);
339
340                         tot_bytes += nb->rnb_len;
341                 }
342         }
343
344         atomic_add(*pages, &obd->u.echo.eo_prep);
345
346         if (cmd & OBD_BRW_READ)
347                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
348                                     tot_bytes);
349         else
350                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
351                                     tot_bytes);
352
353         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
354                atomic_read(&obd->u.echo.eo_prep));
355
356         RETURN(0);
357
358 preprw_cleanup:
359         /* It is possible that we would rather handle errors by  allow
360          * any already-set-up pages to complete, rather than tearing them
361          * all down again.  I believe that this is what the in-kernel
362          * prep/commit operations do.
363          */
364         CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
365         for (i = 0; i < *pages; i++) {
366                 kunmap(res[i].lnb_page);
367                 /* NB if this is a persistent page, __free_page() will just
368                  * lose the extra ref gained above */
369                 __free_page(res[i].lnb_page);
370                 res[i].lnb_page = NULL;
371                 atomic_dec(&obd->u.echo.eo_prep);
372         }
373
374         return rc;
375 }
376
377 static int echo_commitrw(const struct lu_env *env, int cmd,
378                          struct obd_export *export, struct obdo *oa,
379                          int objcount, struct obd_ioobj *obj,
380                          struct niobuf_remote *rb, int niocount,
381                          struct niobuf_local *res, int rc)
382 {
383         struct obd_device *obd;
384         int pgs = 0;
385         int i;
386         ENTRY;
387
388         obd = export->exp_obd;
389         if (obd == NULL)
390                 RETURN(-EINVAL);
391
392         if (rc)
393                 GOTO(commitrw_cleanup, rc);
394
395         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
396                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
397                        objcount, niocount);
398         } else {
399                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
400                        objcount, niocount);
401         }
402
403         if (niocount && res == NULL) {
404                 CERROR("NULL res niobuf with niocount %d\n", niocount);
405                 RETURN(-EINVAL);
406         }
407
408         for (i = 0; i < objcount; i++, obj++) {
409                 int verify = (rc == 0 &&
410                              ostid_id(&obj->ioo_oid) != ECHO_PERSISTENT_OBJID &&
411                               (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
412                               (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
413                 int j;
414
415                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, rb++) {
416                         int vrc = echo_finalize_lb(oa, obj, rb, &pgs, &res[pgs],
417                                                    verify);
418                         if (vrc == 0)
419                                 continue;
420
421                         if (vrc == -EFAULT)
422                                 GOTO(commitrw_cleanup, rc = vrc);
423
424                         if (rc == 0)
425                                 rc = vrc;
426                 }
427
428         }
429
430         atomic_sub(pgs, &obd->u.echo.eo_prep);
431
432         CDEBUG(D_PAGE, "%d pages remain after commit\n",
433                atomic_read(&obd->u.echo.eo_prep));
434         RETURN(rc);
435
436 commitrw_cleanup:
437         atomic_sub(pgs, &obd->u.echo.eo_prep);
438
439         CERROR("cleaning up %d pages (%d obdos)\n",
440                niocount - pgs - 1, objcount);
441
442         while (pgs < niocount) {
443                 struct page *page = res[pgs++].lnb_page;
444
445                 if (page == NULL)
446                         continue;
447
448                 /* NB see comment above regarding persistent pages */
449                 __free_page(page);
450                 atomic_dec(&obd->u.echo.eo_prep);
451         }
452         return rc;
453 }
454
455 LPROC_SEQ_FOPS_RO_TYPE(echo, uuid);
456 static struct lprocfs_vars lprocfs_echo_obd_vars[] = {
457         { .name =       "uuid",
458           .fops =       &echo_uuid_fops         },
459         { NULL }
460 };
461
462 struct obd_ops echo_obd_ops = {
463         .o_owner           = THIS_MODULE,
464         .o_connect         = echo_connect,
465         .o_disconnect      = echo_disconnect,
466         .o_init_export     = echo_init_export,
467         .o_destroy_export  = echo_destroy_export,
468         .o_preprw          = echo_preprw,
469         .o_commitrw        = echo_commitrw,
470 };
471
472 /**
473  * Echo Server request handler for OST_CREATE RPC.
474  *
475  * This is part of request processing. Its simulates the object
476  * creation on OST.
477  *
478  * \param[in] tsi       target session environment for this request
479  *
480  * \retval              0 if successful
481  * \retval              negative value on error
482  */
483 static int esd_create_hdl(struct tgt_session_info *tsi)
484 {
485         const struct obdo *oa = &tsi->tsi_ost_body->oa;
486         struct obd_device *obd = tsi->tsi_exp->exp_obd;
487         struct ost_body *repbody;
488         struct obdo *rep_oa;
489
490         ENTRY;
491
492         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
493         if (repbody == NULL)
494                 RETURN(-ENOMEM);
495
496         if (!(oa->o_mode & S_IFMT)) {
497                 CERROR("%s: no type is set in obdo!\n",
498                        tsi->tsi_exp->exp_obd->obd_name);
499                 RETURN(-ENOENT);
500         }
501
502         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
503                 CERROR("%s: invalid o_valid in obdo: %#llx\n",
504                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
505                 RETURN(-EINVAL);
506         }
507
508         rep_oa = &repbody->oa;
509
510         if (!fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
511                 CERROR("%s: invalid seq %#llx\n",
512                        tsi->tsi_exp->exp_obd->obd_name, ostid_seq(&oa->o_oi));
513                 return -EINVAL;
514         }
515
516         ostid_set_seq_echo(&rep_oa->o_oi);
517         ostid_set_id(&rep_oa->o_oi, echo_next_id(obd));
518
519         CDEBUG(D_INFO, "%s: Create object "DOSTID"\n",
520                tsi->tsi_exp->exp_obd->obd_name, POSTID(&rep_oa->o_oi));
521
522         rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
523
524         RETURN(0);
525 }
526
527 /**
528  * Echo Server request handler for OST_DESTROY RPC.
529  *
530  * This is Echo Server part of request handling. It simulates the objects
531  * destroy on OST.
532  *
533  * \param[in] tsi       target session environment for this request
534  *
535  * \retval              0 if successful
536  * \retval              negative value on error
537  */
538 static int esd_destroy_hdl(struct tgt_session_info *tsi)
539 {
540         const struct obdo *oa = &tsi->tsi_ost_body->oa;
541         struct obd_device *obd = tsi->tsi_exp->exp_obd;
542         struct ost_body *repbody;
543         u64 oid;
544
545         ENTRY;
546
547         oid = ostid_id(&oa->o_oi);
548         LASSERT(oid != 0);
549
550         if (!(oa->o_valid & OBD_MD_FLID)) {
551                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
552                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
553                 RETURN(-EINVAL);
554         }
555
556         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
557
558         if (ostid_id(&oa->o_oi) > obd->u.echo.eo_lastino ||
559             ostid_id(&oa->o_oi) < ECHO_INIT_OID) {
560                 CERROR("%s: bad objid to destroy: "DOSTID"\n",
561                        tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
562                 RETURN(-EINVAL);
563         }
564
565         CDEBUG(D_INFO, "%s: Destroy object "DOSTID"\n",
566                tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
567
568         repbody->oa.o_oi = oa->o_oi;
569         RETURN(0);
570 }
571
572 /**
573  * Echo Server request handler for OST_GETATTR RPC.
574  *
575  * This is Echo Server part of request handling. It returns an object
576  * attributes to the client. All objects have the same attributes in
577  * Echo Server.
578  *
579  * \param[in] tsi       target session environment for this request
580  *
581  * \retval              0 if successful
582  * \retval              negative value on error
583  */
584 static int esd_getattr_hdl(struct tgt_session_info *tsi)
585 {
586         const struct obdo *oa = &tsi->tsi_ost_body->oa;
587         struct obd_device *obd = tsi->tsi_exp->exp_obd;
588         struct ost_body *repbody;
589
590         ENTRY;
591
592         if (!(oa->o_valid & OBD_MD_FLID)) {
593                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
594                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
595                 RETURN(-EINVAL);
596         }
597
598         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
599         if (repbody == NULL)
600                 RETURN(-ENOMEM);
601
602         repbody->oa.o_oi = oa->o_oi;
603         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
604
605         obdo_cpy_md(&repbody->oa, &obd->u.echo.eo_oa, oa->o_valid);
606
607         repbody->oa.o_valid |= OBD_MD_FLFLAGS;
608         repbody->oa.o_flags = OBD_FL_FLUSH;
609
610         RETURN(0);
611 }
612
613 /**
614  * Echo Server request handler for OST_SETATTR RPC.
615  *
616  * This is Echo Server part of request handling. It sets common
617  * attributes from request to the Echo Server objects.
618  *
619  * \param[in] tsi       target session environment for this request
620  *
621  * \retval              0 if successful
622  * \retval              negative value on error
623  */
624 static int esd_setattr_hdl(struct tgt_session_info *tsi)
625 {
626         struct ost_body *body = tsi->tsi_ost_body;
627         struct obd_device *obd = tsi->tsi_exp->exp_obd;
628         struct ost_body *repbody;
629
630         ENTRY;
631
632         if (!(body->oa.o_valid & OBD_MD_FLID)) {
633                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
634                        tsi->tsi_exp->exp_obd->obd_name,
635                        body->oa.o_valid);
636                 RETURN(-EINVAL);
637         }
638
639         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
640         if (repbody == NULL)
641                 RETURN(-ENOMEM);
642
643         repbody->oa.o_oi = body->oa.o_oi;
644         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
645
646         obd->u.echo.eo_oa = body->oa;
647
648         RETURN(0);
649 }
650
651 #define OBD_FAIL_OST_READ_NET   OBD_FAIL_OST_BRW_NET
652 #define OBD_FAIL_OST_WRITE_NET  OBD_FAIL_OST_BRW_NET
653 #define OST_BRW_READ    OST_READ
654 #define OST_BRW_WRITE   OST_WRITE
655
656 /**
657  * Table of Echo Server specific request handlers
658  *
659  * This table contains all opcodes accepted by Echo Server and
660  * specifies handlers for them. The tgt_request_handler()
661  * uses such table from each target to process incoming
662  * requests.
663  */
664 static struct tgt_handler esd_tgt_handlers[] = {
665 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_CONNECT, tgt_connect,
666                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
667 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_DISCONNECT, tgt_disconnect,
668                 &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
669 TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_GETATTR, esd_getattr_hdl),
670 TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO | MUTABOR, OST_SETATTR,
671             esd_setattr_hdl),
672 TGT_OST_HDL(HABEO_REFERO | MUTABOR, OST_CREATE, esd_create_hdl),
673 TGT_OST_HDL(HABEO_REFERO | MUTABOR, OST_DESTROY, esd_destroy_hdl),
674 TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read),
675 TGT_OST_HDL(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write),
676 };
677
678 static struct tgt_opc_slice esd_common_slice[] = {
679         {
680                 .tos_opc_start  = OST_FIRST_OPC,
681                 .tos_opc_end    = OST_LAST_OPC,
682                 .tos_hs         = esd_tgt_handlers
683         },
684         {
685                 .tos_opc_start  = OBD_FIRST_OPC,
686                 .tos_opc_end    = OBD_LAST_OPC,
687                 .tos_hs         = tgt_obd_handlers
688         },
689         {
690                 .tos_opc_start  = LDLM_FIRST_OPC,
691                 .tos_opc_end    = LDLM_LAST_OPC,
692                 .tos_hs         = tgt_dlm_handlers
693         },
694         {
695                 .tos_opc_start  = SEC_FIRST_OPC,
696                 .tos_opc_end    = SEC_LAST_OPC,
697                 .tos_hs         = tgt_sec_ctx_handlers
698         },
699         {
700                 .tos_hs         = NULL
701         }
702 };
703
704 /**
705  * lu_device_operations matrix for ECHO SRV device is NULL,
706  * this device is just serving incoming requests immediately
707  * without building a stack of lu_devices.
708  */
709 static struct lu_device_operations echo_srv_lu_ops = { 0 };
710
711 /**
712  * Initialize Echo Server device with parameters in the config log \a cfg.
713  *
714  * This is the main starting point of Echo Server initialization. It fills all
715  * parameters with their initial values and starts Echo Server.
716  *
717  * \param[in] env       execution environment
718  * \param[in] m         Echo Server device
719  * \param[in] ldt       LU device type of Echo Server
720  * \param[in] cfg       configuration log
721  *
722  * \retval              0 if successful
723  * \retval              negative value on error
724  */
725 static int echo_srv_init0(const struct lu_env *env,
726                           struct echo_srv_device *esd,
727                           struct lu_device_type *ldt, struct lustre_cfg *cfg)
728 {
729         const char *dev = lustre_cfg_string(cfg, 0);
730         struct obd_device *obd;
731         char ns_name[48];
732         int rc;
733
734         ENTRY;
735
736         obd = class_name2obd(dev);
737         if (obd == NULL) {
738                 CERROR("Cannot find obd with name %s\n", dev);
739                 RETURN(-ENODEV);
740         }
741
742         spin_lock_init(&obd->u.echo.eo_lock);
743         obd->u.echo.eo_lastino = ECHO_INIT_OID;
744
745         esd->esd_dev.ld_ops = &echo_srv_lu_ops;
746         esd->esd_dev.ld_obd = obd;
747         /* set this lu_device to obd, because error handling need it */
748         obd->obd_lu_dev = &esd->esd_dev;
749
750         /* No connection accepted until configurations will finish */
751         spin_lock(&obd->obd_dev_lock);
752         obd->obd_no_conn = 1;
753         spin_unlock(&obd->obd_dev_lock);
754
755         /* non-replayable target */
756         obd->obd_replayable = 0;
757
758         snprintf(ns_name, sizeof(ns_name), "echotgt-%s", obd->obd_uuid.uuid);
759         obd->obd_namespace = ldlm_namespace_new(obd, ns_name,
760                                                 LDLM_NAMESPACE_SERVER,
761                                                 LDLM_NAMESPACE_MODEST,
762                                                 LDLM_NS_TYPE_OST);
763         if (obd->obd_namespace == NULL)
764                 RETURN(-ENOMEM);
765
766         obd->obd_vars = lprocfs_echo_obd_vars;
767         if (!lprocfs_obd_setup(obd, true) &&
768             lprocfs_alloc_obd_stats(obd, LPROC_ECHO_LAST) == 0) {
769                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_READ_BYTES,
770                                      LPROCFS_CNTR_AVGMINMAX,
771                                      "read_bytes", "bytes");
772                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
773                                      LPROCFS_CNTR_AVGMINMAX,
774                                      "write_bytes", "bytes");
775         }
776
777         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
778                            "echo_ldlm_cb_client", &obd->obd_ldlm_client);
779
780         rc = tgt_init(env, &esd->esd_lut, obd, NULL, esd_common_slice,
781                       OBD_FAIL_OST_ALL_REQUEST_NET,
782                       OBD_FAIL_OST_ALL_REPLY_NET);
783         if (rc)
784                 GOTO(err_out, rc);
785
786         spin_lock(&obd->obd_dev_lock);
787         obd->obd_no_conn = 0;
788         spin_unlock(&obd->obd_dev_lock);
789
790         RETURN(0);
791
792 err_out:
793         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
794         obd->obd_namespace = NULL;
795
796         lprocfs_obd_cleanup(obd);
797         lprocfs_free_obd_stats(obd);
798         RETURN(rc);
799 }
800
801 /**
802  * Stop the Echo Server device.
803  *
804  * This function stops the Echo Server device and all its subsystems.
805  * This is the end of Echo Server lifecycle.
806  *
807  * \param[in] env       execution environment
808  * \param[in] esd               ESD device
809  */
810 static void echo_srv_fini(const struct lu_env *env,
811                           struct echo_srv_device *esd)
812 {
813         struct obd_device *obd = echo_srv_obd(esd);
814         struct lu_device *d = &esd->esd_dev;
815         int leaked;
816
817         ENTRY;
818
819         class_disconnect_exports(obd);
820         if (obd->obd_namespace != NULL)
821                 ldlm_namespace_free_prior(obd->obd_namespace, NULL,
822                                           obd->obd_force);
823
824         obd_exports_barrier(obd);
825         obd_zombie_barrier();
826
827         tgt_fini(env, &esd->esd_lut);
828
829         if (obd->obd_namespace != NULL) {
830                 ldlm_namespace_free_post(obd->obd_namespace);
831                 obd->obd_namespace = NULL;
832         }
833
834         lprocfs_obd_cleanup(obd);
835         lprocfs_free_obd_stats(obd);
836
837         leaked = atomic_read(&obd->u.echo.eo_prep);
838         if (leaked != 0)
839                 CERROR("%d prep/commitrw pages leaked\n", leaked);
840
841         LASSERT(atomic_read(&d->ld_ref) == 0);
842         EXIT;
843 }
844
845 /**
846  * Implementation of lu_device_type_operations::ldto_device_fini.
847  *
848  * Finalize device. Dual to echo_srv_device_init(). It is called from
849  * obd_precleanup() and stops the current device.
850  *
851  * \param[in] env       execution environment
852  * \param[in] d         LU device of ESD
853  *
854  * \retval              NULL
855  */
856 static struct lu_device *echo_srv_device_fini(const struct lu_env *env,
857                                               struct lu_device *d)
858 {
859         ENTRY;
860         echo_srv_fini(env, echo_srv_dev(d));
861         RETURN(NULL);
862 }
863
864 /**
865  * Implementation of lu_device_type_operations::ldto_device_free.
866  *
867  * Free Echo Server device. Dual to echo_srv_device_alloc().
868  *
869  * \param[in] env       execution environment
870  * \param[in] d         LU device of ESD
871  *
872  * \retval              NULL
873  */
874 static struct lu_device *echo_srv_device_free(const struct lu_env *env,
875                                               struct lu_device *d)
876 {
877         struct echo_srv_device *esd = echo_srv_dev(d);
878
879         lu_device_fini(&esd->esd_dev);
880         OBD_FREE_PTR(esd);
881         RETURN(NULL);
882 }
883
884 /**
885  * Implementation of lu_device_type_operations::ldto_device_alloc.
886  *
887  * This function allocates the new Echo Server device. It is called from
888  * obd_setup() if OBD device had lu_device_type defined.
889  *
890  * \param[in] env       execution environment
891  * \param[in] t         lu_device_type of ESD device
892  * \param[in] cfg       configuration log
893  *
894  * \retval              pointer to the lu_device of just allocated OFD
895  * \retval              ERR_PTR of return value on error
896  */
897 static struct lu_device *echo_srv_device_alloc(const struct lu_env *env,
898                                                struct lu_device_type *t,
899                                                struct lustre_cfg *cfg)
900 {
901         struct echo_srv_device *esd;
902         struct lu_device *l;
903         int rc;
904
905         OBD_ALLOC_PTR(esd);
906         if (esd == NULL)
907                 return ERR_PTR(-ENOMEM);
908
909         l = &esd->esd_dev;
910         lu_device_init(l, t);
911         rc = echo_srv_init0(env, esd, t, cfg);
912         if (rc != 0) {
913                 echo_srv_device_free(env, l);
914                 l = ERR_PTR(rc);
915         }
916
917         return l;
918 }
919
920 static const struct lu_device_type_operations echo_srv_type_ops = {
921         .ldto_device_alloc = echo_srv_device_alloc,
922         .ldto_device_free = echo_srv_device_free,
923         .ldto_device_fini = echo_srv_device_fini
924 };
925
926 struct lu_device_type echo_srv_type = {
927         .ldt_tags = LU_DEVICE_DT,
928         .ldt_name = LUSTRE_ECHO_NAME,
929         .ldt_ops = &echo_srv_type_ops,
930         .ldt_ctx_tags = LCT_DT_THREAD,
931 };
932
933 void echo_persistent_pages_fini(void)
934 {
935         int i;
936
937         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++)
938                 if (echo_persistent_pages[i] != NULL) {
939                         __free_page(echo_persistent_pages[i]);
940                         echo_persistent_pages[i] = NULL;
941                 }
942 }
943
944 int echo_persistent_pages_init(void)
945 {
946         struct page *pg;
947         int          i;
948
949         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++) {
950                 gfp_t gfp_mask = (i < ECHO_PERSISTENT_PAGES/2) ?
951                         GFP_KERNEL : GFP_HIGHUSER;
952
953                 pg = alloc_page(gfp_mask);
954                 if (pg == NULL) {
955                         echo_persistent_pages_fini();
956                         return -ENOMEM;
957                 }
958
959                 memset(kmap(pg), 0, PAGE_SIZE);
960                 kunmap(pg);
961
962                 echo_persistent_pages[i] = pg;
963         }
964
965         return 0;
966 }