Whamcloud - gitweb
LU-6070 libcfs: provide separate buffers for libcfs_*2str()
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct obd_trans_info *oti)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         struct list_head       cancels = LIST_HEAD_INIT(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         osc_set_capa_size(req, &RMF_CAPA1, NULL);
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
661                 oa->o_lcookie = *oti->oti_logcookies;
662         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
663         LASSERT(body);
664         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
665
666         ptlrpc_request_set_replen(req);
667
668         /* If osc_destory is for destroying the unlink orphan,
669          * sent from MDT to OST, which should not be blocked here,
670          * because the process might be triggered by ptlrpcd, and
671          * it is not good to block ptlrpcd thread (b=16006)*/
672         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
673                 req->rq_interpret_reply = osc_destroy_interpret;
674                 if (!osc_can_send_destroy(cli)) {
675                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
676                                                           NULL);
677
678                         /*
679                          * Wait until the number of on-going destroy RPCs drops
680                          * under max_rpc_in_flight
681                          */
682                         l_wait_event_exclusive(cli->cl_destroy_waitq,
683                                                osc_can_send_destroy(cli), &lwi);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
702         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
703                      cli->cl_dirty_max_pages)) {
704                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
705                        cli->cl_dirty_pages, cli->cl_dirty_transit,
706                        cli->cl_dirty_max_pages);
707                 oa->o_undirty = 0;
708         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
709                             atomic_long_read(&obd_dirty_transit_pages) >
710                             (obd_max_dirty_pages + 1))) {
711                 /* The atomic_read() allowing the atomic_inc() are
712                  * not covered by a lock thus they may safely race and trip
713                  * this CERROR() unless we add in a small fudge factor (+1). */
714                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
715                        cli->cl_import->imp_obd->obd_name,
716                        atomic_long_read(&obd_dirty_pages),
717                        atomic_long_read(&obd_dirty_transit_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
727                                       PAGE_CACHE_SHIFT) *
728                                      (cli->cl_max_rpcs_in_flight + 1);
729                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
730                                     max_in_flight);
731         }
732         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
733         oa->o_dropped = cli->cl_lost_grant;
734         cli->cl_lost_grant = 0;
735         spin_unlock(&cli->cl_loi_list_lock);
736         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
737                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
738
739 }
740
741 void osc_update_next_shrink(struct client_obd *cli)
742 {
743         cli->cl_next_shrink_grant =
744                 cfs_time_shift(cli->cl_grant_shrink_interval);
745         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
746                cli->cl_next_shrink_grant);
747 }
748
749 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         cli->cl_avail_grant += grant;
753         spin_unlock(&cli->cl_loi_list_lock);
754 }
755
756 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
757 {
758         if (body->oa.o_valid & OBD_MD_FLGRANT) {
759                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
760                 __osc_update_grant(cli, body->oa.o_grant);
761         }
762 }
763
764 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
765                               obd_count keylen, void *key, obd_count vallen,
766                               void *val, struct ptlrpc_request_set *set);
767
768 static int osc_shrink_grant_interpret(const struct lu_env *env,
769                                       struct ptlrpc_request *req,
770                                       void *aa, int rc)
771 {
772         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
773         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
774         struct ost_body *body;
775
776         if (rc != 0) {
777                 __osc_update_grant(cli, oa->o_grant);
778                 GOTO(out, rc);
779         }
780
781         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         osc_update_grant(cli, body);
784 out:
785         OBDO_FREE(oa);
786         return rc;
787 }
788
789 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
790 {
791         spin_lock(&cli->cl_loi_list_lock);
792         oa->o_grant = cli->cl_avail_grant / 4;
793         cli->cl_avail_grant -= oa->o_grant;
794         spin_unlock(&cli->cl_loi_list_lock);
795         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
796                 oa->o_valid |= OBD_MD_FLFLAGS;
797                 oa->o_flags = 0;
798         }
799         oa->o_flags |= OBD_FL_SHRINK_GRANT;
800         osc_update_next_shrink(cli);
801 }
802
803 /* Shrink the current grant, either from some large amount to enough for a
804  * full set of in-flight RPCs, or if we have already shrunk to that limit
805  * then to enough for a single RPC.  This avoids keeping more grant than
806  * needed, and avoids shrinking the grant piecemeal. */
807 static int osc_shrink_grant(struct client_obd *cli)
808 {
809         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
810                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (cli->cl_avail_grant <= target_bytes)
814                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
815         spin_unlock(&cli->cl_loi_list_lock);
816
817         return osc_shrink_grant_to_target(cli, target_bytes);
818 }
819
820 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
821 {
822         int                     rc = 0;
823         struct ost_body        *body;
824         ENTRY;
825
826         spin_lock(&cli->cl_loi_list_lock);
827         /* Don't shrink if we are already above or below the desired limit
828          * We don't want to shrink below a single RPC, as that will negatively
829          * impact block allocation and long-term performance. */
830         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
831                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832
833         if (target_bytes >= cli->cl_avail_grant) {
834                 spin_unlock(&cli->cl_loi_list_lock);
835                 RETURN(0);
836         }
837         spin_unlock(&cli->cl_loi_list_lock);
838
839         OBD_ALLOC_PTR(body);
840         if (!body)
841                 RETURN(-ENOMEM);
842
843         osc_announce_cached(cli, &body->oa, 0);
844
845         spin_lock(&cli->cl_loi_list_lock);
846         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
847         cli->cl_avail_grant = target_bytes;
848         spin_unlock(&cli->cl_loi_list_lock);
849         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
850                 body->oa.o_valid |= OBD_MD_FLFLAGS;
851                 body->oa.o_flags = 0;
852         }
853         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
854         osc_update_next_shrink(cli);
855
856         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
857                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
858                                 sizeof(*body), body, NULL);
859         if (rc != 0)
860                 __osc_update_grant(cli, body->oa.o_grant);
861         OBD_FREE_PTR(body);
862         RETURN(rc);
863 }
864
865 static int osc_should_shrink_grant(struct client_obd *client)
866 {
867         cfs_time_t time = cfs_time_current();
868         cfs_time_t next_shrink = client->cl_next_shrink_grant;
869
870         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
871              OBD_CONNECT_GRANT_SHRINK) == 0)
872                 return 0;
873
874         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
875                 /* Get the current RPC size directly, instead of going via:
876                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
877                  * Keep comment here so that it can be found by searching. */
878                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
879
880                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
881                     client->cl_avail_grant > brw_size)
882                         return 1;
883                 else
884                         osc_update_next_shrink(client);
885         }
886         return 0;
887 }
888
889 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
890 {
891         struct client_obd *client;
892
893         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
894                 if (osc_should_shrink_grant(client))
895                         osc_shrink_grant(client);
896         }
897         return 0;
898 }
899
900 static int osc_add_shrink_grant(struct client_obd *client)
901 {
902         int rc;
903
904         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
905                                        TIMEOUT_GRANT,
906                                        osc_grant_shrink_grant_cb, NULL,
907                                        &client->cl_grant_shrink_list);
908         if (rc) {
909                 CERROR("add grant client %s error %d\n",
910                         client->cl_import->imp_obd->obd_name, rc);
911                 return rc;
912         }
913         CDEBUG(D_CACHE, "add grant client %s \n",
914                client->cl_import->imp_obd->obd_name);
915         osc_update_next_shrink(client);
916         return 0;
917 }
918
919 static int osc_del_shrink_grant(struct client_obd *client)
920 {
921         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
922                                          TIMEOUT_GRANT);
923 }
924
925 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
926 {
927         /*
928          * ocd_grant is the total grant amount we're expect to hold: if we've
929          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
930          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
931          * dirty.
932          *
933          * race is tolerable here: if we're evicted, but imp_state already
934          * left EVICTED state, then cl_dirty_pages must be 0 already.
935          */
936         spin_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
938                 cli->cl_avail_grant = ocd->ocd_grant;
939         else
940                 cli->cl_avail_grant = ocd->ocd_grant -
941                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942
943         if (cli->cl_avail_grant < 0) {
944                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
945                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
946                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
947                 /* workaround for servers which do not have the patch from
948                  * LU-2679 */
949                 cli->cl_avail_grant = ocd->ocd_grant;
950         }
951
952         /* determine the appropriate chunk size used by osc_extent. */
953         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
954         spin_unlock(&cli->cl_loi_list_lock);
955
956         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
957                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
958                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, obd_count page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~CFS_PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            obd_count page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1061                                    struct brw_page **pga, int opc,
1062                                    cksum_type_t cksum_type)
1063 {
1064         __u32                           cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         int                             err;
1069         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1070
1071         LASSERT(pg_count > 0);
1072
1073         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1074         if (IS_ERR(hdesc)) {
1075                 CERROR("Unable to initialize checksum hash %s\n",
1076                        cfs_crypto_hash_name(cfs_alg));
1077                 return PTR_ERR(hdesc);
1078         }
1079
1080         while (nob > 0 && pg_count > 0) {
1081                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1082
1083                 /* corrupt the data before we compute the checksum, to
1084                  * simulate an OST->client data error */
1085                 if (i == 0 && opc == OST_READ &&
1086                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1087                         unsigned char *ptr = kmap(pga[i]->pg);
1088                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1089
1090                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1091                         kunmap(pga[i]->pg);
1092                 }
1093                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1094                                             pga[i]->off & ~CFS_PAGE_MASK,
1095                                             count);
1096                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1097                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1098
1099                 nob -= pga[i]->count;
1100                 pg_count--;
1101                 i++;
1102         }
1103
1104         bufsize = sizeof(cksum);
1105         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106
1107         /* For sending we only compute the wrong checksum instead
1108          * of corrupting the data so it is still correct on a redo */
1109         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1110                 cksum++;
1111
1112         return cksum;
1113 }
1114
1115 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1116                                 struct lov_stripe_md *lsm, obd_count page_count,
1117                                 struct brw_page **pga,
1118                                 struct ptlrpc_request **reqp,
1119                                 struct obd_capa *ocapa, int reserve,
1120                                 int resend)
1121 {
1122         struct ptlrpc_request   *req;
1123         struct ptlrpc_bulk_desc *desc;
1124         struct ost_body         *body;
1125         struct obd_ioobj        *ioobj;
1126         struct niobuf_remote    *niobuf;
1127         int niocount, i, requested_nob, opc, rc;
1128         struct osc_brw_async_args *aa;
1129         struct req_capsule      *pill;
1130         struct brw_page *pg_prev;
1131
1132         ENTRY;
1133         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1134                 RETURN(-ENOMEM); /* Recoverable */
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1136                 RETURN(-EINVAL); /* Fatal */
1137
1138         if ((cmd & OBD_BRW_WRITE) != 0) {
1139                 opc = OST_WRITE;
1140                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1141                                                 cli->cl_import->imp_rq_pool,
1142                                                 &RQF_OST_BRW_WRITE);
1143         } else {
1144                 opc = OST_READ;
1145                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1146         }
1147         if (req == NULL)
1148                 RETURN(-ENOMEM);
1149
1150         for (niocount = i = 1; i < page_count; i++) {
1151                 if (!can_merge_pages(pga[i - 1], pga[i]))
1152                         niocount++;
1153         }
1154
1155         pill = &req->rq_pill;
1156         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1157                              sizeof(*ioobj));
1158         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1159                              niocount * sizeof(*niobuf));
1160         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1161
1162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1163         if (rc) {
1164                 ptlrpc_request_free(req);
1165                 RETURN(rc);
1166         }
1167         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1168         ptlrpc_at_set_req_timeout(req);
1169         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1170          * retry logic */
1171         req->rq_no_retry_einprogress = 1;
1172
1173         desc = ptlrpc_prep_bulk_imp(req, page_count,
1174                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1175                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1176                 OST_BULK_PORTAL);
1177
1178         if (desc == NULL)
1179                 GOTO(out, rc = -ENOMEM);
1180         /* NB request now owns desc and will free it when it gets freed */
1181
1182         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1183         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1184         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1185         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186
1187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188
1189         obdo_to_ioobj(oa, ioobj);
1190         ioobj->ioo_bufcnt = niocount;
1191         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1192          * that might be send for this request.  The actual number is decided
1193          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1194          * "max - 1" for old client compatibility sending "0", and also so the
1195          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1196         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1197         osc_pack_capa(req, body, ocapa);
1198         LASSERT(page_count > 0);
1199         pg_prev = pga[0];
1200         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1201                 struct brw_page *pg = pga[i];
1202                 int poff = pg->off & ~CFS_PAGE_MASK;
1203
1204                 LASSERT(pg->count > 0);
1205                 /* make sure there is no gap in the middle of page array */
1206                 LASSERTF(page_count == 1 ||
1207                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1208                           ergo(i > 0 && i < page_count - 1,
1209                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1210                           ergo(i == page_count - 1, poff == 0)),
1211                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1212                          i, page_count, pg, pg->off, pg->count);
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1221                         (pg->flag & OBD_BRW_SRVLOCK));
1222
1223                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1224                 requested_nob += pg->count;
1225
1226                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1227                         niobuf--;
1228                         niobuf->rnb_len += pg->count;
1229                 } else {
1230                         niobuf->rnb_offset = pg->off;
1231                         niobuf->rnb_len    = pg->count;
1232                         niobuf->rnb_flags  = pg->flag;
1233                 }
1234                 pg_prev = pg;
1235         }
1236
1237         LASSERTF((void *)(niobuf - niocount) ==
1238                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1239                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1240                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1241
1242         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1243         if (resend) {
1244                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1245                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1246                         body->oa.o_flags = 0;
1247                 }
1248                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249         }
1250
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum &&
1257                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1258                         /* store cl_cksum_type in a local variable since
1259                          * it can be changed via lprocfs */
1260                         cksum_type_t cksum_type = cli->cl_cksum_type;
1261
1262                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1263                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1264                                 body->oa.o_flags = 0;
1265                         }
1266                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1267                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1268                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269                                                              page_count, pga,
1270                                                              OST_WRITE,
1271                                                              cksum_type);
1272                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273                                body->oa.o_cksum);
1274                         /* save this in 'oa', too, for later checking */
1275                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         oa->o_flags |= cksum_type_pack(cksum_type);
1277                 } else {
1278                         /* clear out the checksum flag, in case this is a
1279                          * resend but cl_checksum is no longer set. b=11238 */
1280                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1281                 }
1282                 oa->o_cksum = body->oa.o_cksum;
1283                 /* 1 RC per niobuf */
1284                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1285                                      sizeof(__u32) * niocount);
1286         } else {
1287                 if (cli->cl_checksum &&
1288                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1289                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1290                                 body->oa.o_flags = 0;
1291                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1292                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1293                 }
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307         if (ocapa && reserve)
1308                 aa->aa_ocapa = capa_get(ocapa);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1323                                 __u32 client_cksum, __u32 server_cksum, int nob,
1324                                 obd_count page_count, struct brw_page **pga,
1325                                 cksum_type_t client_cksum_type)
1326 {
1327         __u32 new_cksum;
1328         char *msg;
1329         cksum_type_t cksum_type;
1330
1331         if (server_cksum == client_cksum) {
1332                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1333                 return 0;
1334         }
1335
1336         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1337                                        oa->o_flags : 0);
1338         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1339                                       cksum_type);
1340
1341         if (cksum_type != client_cksum_type)
1342                 msg = "the server did not use the checksum type specified in "
1343                       "the original request - likely a protocol problem";
1344         else if (new_cksum == server_cksum)
1345                 msg = "changed on the client after we checksummed it - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347         else if (new_cksum == client_cksum)
1348                 msg = "changed in transit before arrival at OST";
1349         else
1350                 msg = "changed in transit AND doesn't match the original - "
1351                       "likely false positive due to mmap IO (bug 11742)";
1352
1353         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1354                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1355                            msg, libcfs_nid2str(peer->nid),
1356                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1359                            POSTID(&oa->o_oi), pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364         return 1;
1365 }
1366
1367 /* Note rc enters this function as number of bytes transferred */
1368 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1369 {
1370         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1371         const lnet_process_id_t *peer =
1372                         &req->rq_import->imp_connection->c_peer;
1373         struct client_obd *cli = aa->aa_cli;
1374         struct ost_body *body;
1375         u32 client_cksum = 0;
1376         ENTRY;
1377
1378         if (rc < 0 && rc != -EDQUOT) {
1379                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1380                 RETURN(rc);
1381         }
1382
1383         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1384         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1385         if (body == NULL) {
1386                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1393                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1394
1395                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1396                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1397                        body->oa.o_flags);
1398                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1399         }
1400
1401         osc_update_grant(cli, body);
1402
1403         if (rc < 0)
1404                 RETURN(rc);
1405
1406         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1407                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408
1409         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1410                 if (rc > 0) {
1411                         CERROR("Unexpected +ve rc %d\n", rc);
1412                         RETURN(-EPROTO);
1413                 }
1414                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1415
1416                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1417                         RETURN(-EAGAIN);
1418
1419                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1420                     check_write_checksum(&body->oa, peer, client_cksum,
1421                                          body->oa.o_cksum, aa->aa_requested_nob,
1422                                          aa->aa_page_count, aa->aa_ppga,
1423                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1424                         RETURN(-EAGAIN);
1425
1426                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1427                                      aa->aa_page_count, aa->aa_ppga);
1428                 GOTO(out, rc);
1429         }
1430
1431         /* The rest of this function executes only for OST_READs */
1432
1433         /* if unwrap_bulk failed, return -EAGAIN to retry */
1434         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1435         if (rc < 0)
1436                 GOTO(out, rc = -EAGAIN);
1437
1438         if (rc > aa->aa_requested_nob) {
1439                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1440                        aa->aa_requested_nob);
1441                 RETURN(-EPROTO);
1442         }
1443
1444         if (rc != req->rq_bulk->bd_nob_transferred) {
1445                 CERROR ("Unexpected rc %d (%d transferred)\n",
1446                         rc, req->rq_bulk->bd_nob_transferred);
1447                 return (-EPROTO);
1448         }
1449
1450         if (rc < aa->aa_requested_nob)
1451                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1452
1453         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1454                 static int cksum_counter;
1455                 u32        server_cksum = body->oa.o_cksum;
1456                 char      *via = "";
1457                 char      *router = "";
1458                 cksum_type_t cksum_type;
1459
1460                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1461                                                body->oa.o_flags : 0);
1462                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1463                                                  aa->aa_ppga, OST_READ,
1464                                                  cksum_type);
1465
1466                 if (peer->nid != req->rq_bulk->bd_sender) {
1467                         via = " via ";
1468                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1469                 }
1470
1471                 if (server_cksum != client_cksum) {
1472                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1473                                            "%s%s%s inode "DFID" object "DOSTID
1474                                            " extent ["LPU64"-"LPU64"]\n",
1475                                            req->rq_import->imp_obd->obd_name,
1476                                            libcfs_nid2str(peer->nid),
1477                                            via, router,
1478                                            body->oa.o_valid & OBD_MD_FLFID ?
1479                                                 body->oa.o_parent_seq : (__u64)0,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_oid : 0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_ver : 0,
1484                                            POSTID(&body->oa.o_oi),
1485                                            aa->aa_ppga[0]->off,
1486                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1487                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1488                                                                         1);
1489                         CERROR("client %x, server %x, cksum_type %x\n",
1490                                client_cksum, server_cksum, cksum_type);
1491                         cksum_counter = 0;
1492                         aa->aa_oa->o_cksum = client_cksum;
1493                         rc = -EAGAIN;
1494                 } else {
1495                         cksum_counter++;
1496                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1497                         rc = 0;
1498                 }
1499         } else if (unlikely(client_cksum)) {
1500                 static int cksum_missed;
1501
1502                 cksum_missed++;
1503                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1504                         CERROR("Checksum %u requested from %s but not sent\n",
1505                                cksum_missed, libcfs_nid2str(peer->nid));
1506         } else {
1507                 rc = 0;
1508         }
1509 out:
1510         if (rc >= 0)
1511                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1512                                      aa->aa_oa, &body->oa);
1513
1514         RETURN(rc);
1515 }
1516
1517 static int osc_brw_redo_request(struct ptlrpc_request *request,
1518                                 struct osc_brw_async_args *aa, int rc)
1519 {
1520         struct ptlrpc_request *new_req;
1521         struct osc_brw_async_args *new_aa;
1522         struct osc_async_page *oap;
1523         ENTRY;
1524
1525         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1526                   "redo for recoverable error %d", rc);
1527
1528         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1529                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1530                                   aa->aa_cli, aa->aa_oa,
1531                                   NULL /* lsm unused by osc currently */,
1532                                   aa->aa_page_count, aa->aa_ppga,
1533                                   &new_req, aa->aa_ocapa, 0, 1);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538                 if (oap->oap_request != NULL) {
1539                         LASSERTF(request == oap->oap_request,
1540                                  "request %p != oap_request %p\n",
1541                                  request, oap->oap_request);
1542                         if (oap->oap_interrupted) {
1543                                 ptlrpc_req_finished(new_req);
1544                                 RETURN(-EINTR);
1545                         }
1546                 }
1547         }
1548         /* New request takes over pga and oaps from old request.
1549          * Note that copying a list_head doesn't work, need to move it... */
1550         aa->aa_resends++;
1551         new_req->rq_interpret_reply = request->rq_interpret_reply;
1552         new_req->rq_async_args = request->rq_async_args;
1553         new_req->rq_commit_cb = request->rq_commit_cb;
1554         /* cap resend delay to the current request timeout, this is similar to
1555          * what ptlrpc does (see after_reply()) */
1556         if (aa->aa_resends > new_req->rq_timeout)
1557                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558         else
1559                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560         new_req->rq_generation_set = 1;
1561         new_req->rq_import_generation = request->rq_import_generation;
1562
1563         new_aa = ptlrpc_req_async_args(new_req);
1564
1565         INIT_LIST_HEAD(&new_aa->aa_oaps);
1566         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567         INIT_LIST_HEAD(&new_aa->aa_exts);
1568         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569         new_aa->aa_resends = aa->aa_resends;
1570
1571         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572                 if (oap->oap_request) {
1573                         ptlrpc_req_finished(oap->oap_request);
1574                         oap->oap_request = ptlrpc_request_addref(new_req);
1575                 }
1576         }
1577
1578         new_aa->aa_ocapa = aa->aa_ocapa;
1579         aa->aa_ocapa = NULL;
1580
1581         /* XXX: This code will run into problem if we're going to support
1582          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1583          * and wait for all of them to be finished. We should inherit request
1584          * set from old request. */
1585         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1586
1587         DEBUG_REQ(D_INFO, new_req, "new request");
1588         RETURN(0);
1589 }
1590
1591 /*
1592  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1593  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1594  * fine for our small page arrays and doesn't require allocation.  its an
1595  * insertion sort that swaps elements that are strides apart, shrinking the
1596  * stride down until its '1' and the array is sorted.
1597  */
1598 static void sort_brw_pages(struct brw_page **array, int num)
1599 {
1600         int stride, i, j;
1601         struct brw_page *tmp;
1602
1603         if (num == 1)
1604                 return;
1605         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1606                 ;
1607
1608         do {
1609                 stride /= 3;
1610                 for (i = stride ; i < num ; i++) {
1611                         tmp = array[i];
1612                         j = i;
1613                         while (j >= stride && array[j - stride]->off > tmp->off) {
1614                                 array[j] = array[j - stride];
1615                                 j -= stride;
1616                         }
1617                         array[j] = tmp;
1618                 }
1619         } while (stride > 1);
1620 }
1621
1622 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1623 {
1624         LASSERT(ppga != NULL);
1625         OBD_FREE(ppga, sizeof(*ppga) * count);
1626 }
1627
1628 static int brw_interpret(const struct lu_env *env,
1629                          struct ptlrpc_request *req, void *data, int rc)
1630 {
1631         struct osc_brw_async_args *aa = data;
1632         struct osc_extent *ext;
1633         struct osc_extent *tmp;
1634         struct client_obd *cli = aa->aa_cli;
1635         ENTRY;
1636
1637         rc = osc_brw_fini_request(req, rc);
1638         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1639         /* When server return -EINPROGRESS, client should always retry
1640          * regardless of the number of times the bulk was resent already. */
1641         if (osc_recoverable_error(rc)) {
1642                 if (req->rq_import_generation !=
1643                     req->rq_import->imp_generation) {
1644                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1645                                ""DOSTID", rc = %d.\n",
1646                                req->rq_import->imp_obd->obd_name,
1647                                POSTID(&aa->aa_oa->o_oi), rc);
1648                 } else if (rc == -EINPROGRESS ||
1649                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1650                         rc = osc_brw_redo_request(req, aa, rc);
1651                 } else {
1652                         CERROR("%s: too many resent retries for object: "
1653                                ""LPU64":"LPU64", rc = %d.\n",
1654                                req->rq_import->imp_obd->obd_name,
1655                                POSTID(&aa->aa_oa->o_oi), rc);
1656                 }
1657
1658                 if (rc == 0)
1659                         RETURN(0);
1660                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1661                         rc = -EIO;
1662         }
1663
1664         if (aa->aa_ocapa) {
1665                 capa_put(aa->aa_ocapa);
1666                 aa->aa_ocapa = NULL;
1667         }
1668
1669         if (rc == 0) {
1670                 struct obdo *oa = aa->aa_oa;
1671                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1672                 unsigned long valid = 0;
1673                 struct cl_object *obj;
1674                 struct osc_async_page *last;
1675
1676                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1677                 obj = osc2cl(last->oap_obj);
1678
1679                 cl_object_attr_lock(obj);
1680                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1681                         attr->cat_blocks = oa->o_blocks;
1682                         valid |= CAT_BLOCKS;
1683                 }
1684                 if (oa->o_valid & OBD_MD_FLMTIME) {
1685                         attr->cat_mtime = oa->o_mtime;
1686                         valid |= CAT_MTIME;
1687                 }
1688                 if (oa->o_valid & OBD_MD_FLATIME) {
1689                         attr->cat_atime = oa->o_atime;
1690                         valid |= CAT_ATIME;
1691                 }
1692                 if (oa->o_valid & OBD_MD_FLCTIME) {
1693                         attr->cat_ctime = oa->o_ctime;
1694                         valid |= CAT_CTIME;
1695                 }
1696
1697                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1698                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1699                         loff_t last_off = last->oap_count + last->oap_obj_off +
1700                                 last->oap_page_off;
1701
1702                         /* Change file size if this is an out of quota or
1703                          * direct IO write and it extends the file size */
1704                         if (loi->loi_lvb.lvb_size < last_off) {
1705                                 attr->cat_size = last_off;
1706                                 valid |= CAT_SIZE;
1707                         }
1708                         /* Extend KMS if it's not a lockless write */
1709                         if (loi->loi_kms < last_off &&
1710                             oap2osc_page(last)->ops_srvlock == 0) {
1711                                 attr->cat_kms = last_off;
1712                                 valid |= CAT_KMS;
1713                         }
1714                 }
1715
1716                 if (valid != 0)
1717                         cl_object_attr_update(env, obj, attr, valid);
1718                 cl_object_attr_unlock(obj);
1719         }
1720         OBDO_FREE(aa->aa_oa);
1721
1722         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1723                 osc_inc_unstable_pages(req);
1724
1725         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1726                 list_del_init(&ext->oe_link);
1727                 osc_extent_finish(env, ext, 1, rc);
1728         }
1729         LASSERT(list_empty(&aa->aa_exts));
1730         LASSERT(list_empty(&aa->aa_oaps));
1731
1732         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1733                           req->rq_bulk->bd_nob_transferred);
1734         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1735         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1736
1737         spin_lock(&cli->cl_loi_list_lock);
1738         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1739          * is called so we know whether to go to sync BRWs or wait for more
1740          * RPCs to complete */
1741         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1742                 cli->cl_w_in_flight--;
1743         else
1744                 cli->cl_r_in_flight--;
1745         osc_wake_cache_waiters(cli);
1746         spin_unlock(&cli->cl_loi_list_lock);
1747
1748         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1749         RETURN(rc);
1750 }
1751
1752 static void brw_commit(struct ptlrpc_request *req)
1753 {
1754         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1755          * this called via the rq_commit_cb, I need to ensure
1756          * osc_dec_unstable_pages is still called. Otherwise unstable
1757          * pages may be leaked. */
1758         spin_lock(&req->rq_lock);
1759         if (likely(req->rq_unstable)) {
1760                 req->rq_unstable = 0;
1761                 spin_unlock(&req->rq_lock);
1762
1763                 osc_dec_unstable_pages(req);
1764         } else {
1765                 req->rq_committed = 1;
1766                 spin_unlock(&req->rq_lock);
1767         }
1768 }
1769
1770 /**
1771  * Build an RPC by the list of extent @ext_list. The caller must ensure
1772  * that the total pages in this list are NOT over max pages per RPC.
1773  * Extents in the list must be in OES_RPC state.
1774  */
1775 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1776                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1777 {
1778         struct ptlrpc_request           *req = NULL;
1779         struct osc_extent               *ext;
1780         struct brw_page                 **pga = NULL;
1781         struct osc_brw_async_args       *aa = NULL;
1782         struct obdo                     *oa = NULL;
1783         struct osc_async_page           *oap;
1784         struct osc_async_page           *tmp;
1785         struct cl_req                   *clerq = NULL;
1786         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1787                                                                       CRT_READ;
1788         struct cl_req_attr              *crattr = NULL;
1789         obd_off                         starting_offset = OBD_OBJECT_EOF;
1790         obd_off                         ending_offset = 0;
1791         int                             mpflag = 0;
1792         int                             mem_tight = 0;
1793         int                             page_count = 0;
1794         bool                            soft_sync = false;
1795         int                             i;
1796         int                             rc;
1797         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1798         struct ost_body                 *body;
1799         ENTRY;
1800         LASSERT(!list_empty(ext_list));
1801
1802         /* add pages into rpc_list to build BRW rpc */
1803         list_for_each_entry(ext, ext_list, oe_link) {
1804                 LASSERT(ext->oe_state == OES_RPC);
1805                 mem_tight |= ext->oe_memalloc;
1806                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1807                         ++page_count;
1808                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1809                         if (starting_offset > oap->oap_obj_off)
1810                                 starting_offset = oap->oap_obj_off;
1811                         else
1812                                 LASSERT(oap->oap_page_off == 0);
1813                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1814                                 ending_offset = oap->oap_obj_off +
1815                                                 oap->oap_count;
1816                         else
1817                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1818                                         PAGE_CACHE_SIZE);
1819                 }
1820         }
1821
1822         soft_sync = osc_over_unstable_soft_limit(cli);
1823         if (mem_tight)
1824                 mpflag = cfs_memory_pressure_get_and_set();
1825
1826         OBD_ALLOC(crattr, sizeof(*crattr));
1827         if (crattr == NULL)
1828                 GOTO(out, rc = -ENOMEM);
1829
1830         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1831         if (pga == NULL)
1832                 GOTO(out, rc = -ENOMEM);
1833
1834         OBDO_ALLOC(oa);
1835         if (oa == NULL)
1836                 GOTO(out, rc = -ENOMEM);
1837
1838         i = 0;
1839         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1840                 struct cl_page *page = oap2cl_page(oap);
1841                 if (clerq == NULL) {
1842                         clerq = cl_req_alloc(env, page, crt,
1843                                              1 /* only 1-object rpcs for now */);
1844                         if (IS_ERR(clerq))
1845                                 GOTO(out, rc = PTR_ERR(clerq));
1846                 }
1847                 if (mem_tight)
1848                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1849                 if (soft_sync)
1850                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1851                 pga[i] = &oap->oap_brw_page;
1852                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1853                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1854                        pga[i]->pg, page_index(oap->oap_page), oap,
1855                        pga[i]->flag);
1856                 i++;
1857                 cl_req_page_add(env, clerq, page);
1858         }
1859
1860         /* always get the data for the obdo for the rpc */
1861         LASSERT(clerq != NULL);
1862         crattr->cra_oa = oa;
1863         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1864
1865         rc = cl_req_prep(env, clerq);
1866         if (rc != 0) {
1867                 CERROR("cl_req_prep failed: %d\n", rc);
1868                 GOTO(out, rc);
1869         }
1870
1871         sort_brw_pages(pga, page_count);
1872         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1873                         pga, &req, crattr->cra_capa, 1, 0);
1874         if (rc != 0) {
1875                 CERROR("prep_req failed: %d\n", rc);
1876                 GOTO(out, rc);
1877         }
1878
1879         req->rq_commit_cb = brw_commit;
1880         req->rq_interpret_reply = brw_interpret;
1881
1882         if (mem_tight != 0)
1883                 req->rq_memalloc = 1;
1884
1885         /* Need to update the timestamps after the request is built in case
1886          * we race with setattr (locally or in queue at OST).  If OST gets
1887          * later setattr before earlier BRW (as determined by the request xid),
1888          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1889          * way to do this in a single call.  bug 10150 */
1890         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1891         crattr->cra_oa = &body->oa;
1892         cl_req_attr_set(env, clerq, crattr,
1893                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1894
1895         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1896
1897         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1898         aa = ptlrpc_req_async_args(req);
1899         INIT_LIST_HEAD(&aa->aa_oaps);
1900         list_splice_init(&rpc_list, &aa->aa_oaps);
1901         INIT_LIST_HEAD(&aa->aa_exts);
1902         list_splice_init(ext_list, &aa->aa_exts);
1903         aa->aa_clerq = clerq;
1904
1905         /* queued sync pages can be torn down while the pages
1906          * were between the pending list and the rpc */
1907         tmp = NULL;
1908         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1909                 /* only one oap gets a request reference */
1910                 if (tmp == NULL)
1911                         tmp = oap;
1912                 if (oap->oap_interrupted && !req->rq_intr) {
1913                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1914                                         oap, req);
1915                         ptlrpc_mark_interrupted(req);
1916                 }
1917         }
1918         if (tmp != NULL)
1919                 tmp->oap_request = ptlrpc_request_addref(req);
1920
1921         spin_lock(&cli->cl_loi_list_lock);
1922         starting_offset >>= PAGE_CACHE_SHIFT;
1923         if (cmd == OBD_BRW_READ) {
1924                 cli->cl_r_in_flight++;
1925                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1926                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1927                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1928                                       starting_offset + 1);
1929         } else {
1930                 cli->cl_w_in_flight++;
1931                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1932                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1933                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1934                                       starting_offset + 1);
1935         }
1936         spin_unlock(&cli->cl_loi_list_lock);
1937
1938         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1939                   page_count, aa, cli->cl_r_in_flight,
1940                   cli->cl_w_in_flight);
1941
1942         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1943          * see which CPU/NUMA node the majority of pages were allocated
1944          * on, and try to assign the async RPC to the CPU core
1945          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1946          *
1947          * But on the other hand, we expect that multiple ptlrpcd
1948          * threads and the initial write sponsor can run in parallel,
1949          * especially when data checksum is enabled, which is CPU-bound
1950          * operation and single ptlrpcd thread cannot process in time.
1951          * So more ptlrpcd threads sharing BRW load
1952          * (with PDL_POLICY_ROUND) seems better.
1953          */
1954         ptlrpcd_add_req(req, pol, -1);
1955         rc = 0;
1956         EXIT;
1957
1958 out:
1959         if (mem_tight != 0)
1960                 cfs_memory_pressure_restore(mpflag);
1961
1962         if (crattr != NULL) {
1963                 capa_put(crattr->cra_capa);
1964                 OBD_FREE(crattr, sizeof(*crattr));
1965         }
1966
1967         if (rc != 0) {
1968                 LASSERT(req == NULL);
1969
1970                 if (oa)
1971                         OBDO_FREE(oa);
1972                 if (pga)
1973                         OBD_FREE(pga, sizeof(*pga) * page_count);
1974                 /* this should happen rarely and is pretty bad, it makes the
1975                  * pending list not follow the dirty order */
1976                 while (!list_empty(ext_list)) {
1977                         ext = list_entry(ext_list->next, struct osc_extent,
1978                                          oe_link);
1979                         list_del_init(&ext->oe_link);
1980                         osc_extent_finish(env, ext, 0, rc);
1981                 }
1982                 if (clerq && !IS_ERR(clerq))
1983                         cl_req_completion(env, clerq, rc);
1984         }
1985         RETURN(rc);
1986 }
1987
1988 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1989                                         struct ldlm_enqueue_info *einfo)
1990 {
1991         void *data = einfo->ei_cbdata;
1992         int set = 0;
1993
1994         LASSERT(lock != NULL);
1995         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1996         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1997         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1998         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1999
2000         lock_res_and_lock(lock);
2001
2002         if (lock->l_ast_data == NULL)
2003                 lock->l_ast_data = data;
2004         if (lock->l_ast_data == data)
2005                 set = 1;
2006
2007         unlock_res_and_lock(lock);
2008
2009         return set;
2010 }
2011
2012 static int osc_set_data_with_check(struct lustre_handle *lockh,
2013                                    struct ldlm_enqueue_info *einfo)
2014 {
2015         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2016         int set = 0;
2017
2018         if (lock != NULL) {
2019                 set = osc_set_lock_data_with_check(lock, einfo);
2020                 LDLM_LOCK_PUT(lock);
2021         } else
2022                 CERROR("lockh %p, data %p - client evicted?\n",
2023                        lockh, einfo->ei_cbdata);
2024         return set;
2025 }
2026
2027 static int osc_enqueue_fini(struct ptlrpc_request *req,
2028                             osc_enqueue_upcall_f upcall, void *cookie,
2029                             struct lustre_handle *lockh, ldlm_mode_t mode,
2030                             __u64 *flags, int agl, int errcode)
2031 {
2032         bool intent = *flags & LDLM_FL_HAS_INTENT;
2033         int rc;
2034         ENTRY;
2035
2036         /* The request was created before ldlm_cli_enqueue call. */
2037         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2038                 struct ldlm_reply *rep;
2039
2040                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2041                 LASSERT(rep != NULL);
2042
2043                 rep->lock_policy_res1 =
2044                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2045                 if (rep->lock_policy_res1)
2046                         errcode = rep->lock_policy_res1;
2047                 if (!agl)
2048                         *flags |= LDLM_FL_LVB_READY;
2049         } else if (errcode == ELDLM_OK) {
2050                 *flags |= LDLM_FL_LVB_READY;
2051         }
2052
2053         /* Call the update callback. */
2054         rc = (*upcall)(cookie, lockh, errcode);
2055
2056         /* release the reference taken in ldlm_cli_enqueue() */
2057         if (errcode == ELDLM_LOCK_MATCHED)
2058                 errcode = ELDLM_OK;
2059         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2060                 ldlm_lock_decref(lockh, mode);
2061
2062         RETURN(rc);
2063 }
2064
2065 static int osc_enqueue_interpret(const struct lu_env *env,
2066                                  struct ptlrpc_request *req,
2067                                  struct osc_enqueue_args *aa, int rc)
2068 {
2069         struct ldlm_lock *lock;
2070         struct lustre_handle *lockh = &aa->oa_lockh;
2071         ldlm_mode_t mode = aa->oa_mode;
2072         struct ost_lvb *lvb = aa->oa_lvb;
2073         __u32 lvb_len = sizeof(*lvb);
2074         __u64 flags = 0;
2075
2076         ENTRY;
2077
2078         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2079          * be valid. */
2080         lock = ldlm_handle2lock(lockh);
2081         LASSERTF(lock != NULL,
2082                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2083                  lockh->cookie, req, aa);
2084
2085         /* Take an additional reference so that a blocking AST that
2086          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2087          * to arrive after an upcall has been executed by
2088          * osc_enqueue_fini(). */
2089         ldlm_lock_addref(lockh, mode);
2090
2091         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2092         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2093
2094         /* Let CP AST to grant the lock first. */
2095         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2096
2097         if (aa->oa_agl) {
2098                 LASSERT(aa->oa_lvb == NULL);
2099                 LASSERT(aa->oa_flags == NULL);
2100                 aa->oa_flags = &flags;
2101         }
2102
2103         /* Complete obtaining the lock procedure. */
2104         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2105                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2106                                    lockh, rc);
2107         /* Complete osc stuff. */
2108         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2109                               aa->oa_flags, aa->oa_agl, rc);
2110
2111         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2112
2113         ldlm_lock_decref(lockh, mode);
2114         LDLM_LOCK_PUT(lock);
2115         RETURN(rc);
2116 }
2117
2118 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2119
2120 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2121  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2122  * other synchronous requests, however keeping some locks and trying to obtain
2123  * others may take a considerable amount of time in a case of ost failure; and
2124  * when other sync requests do not get released lock from a client, the client
2125  * is evicted from the cluster -- such scenarious make the life difficult, so
2126  * release locks just after they are obtained. */
2127 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2128                      __u64 *flags, ldlm_policy_data_t *policy,
2129                      struct ost_lvb *lvb, int kms_valid,
2130                      osc_enqueue_upcall_f upcall, void *cookie,
2131                      struct ldlm_enqueue_info *einfo,
2132                      struct ptlrpc_request_set *rqset, int async, int agl)
2133 {
2134         struct obd_device *obd = exp->exp_obd;
2135         struct lustre_handle lockh = { 0 };
2136         struct ptlrpc_request *req = NULL;
2137         int intent = *flags & LDLM_FL_HAS_INTENT;
2138         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2139         ldlm_mode_t mode;
2140         int rc;
2141         ENTRY;
2142
2143         /* Filesystem lock extents are extended to page boundaries so that
2144          * dealing with the page cache is a little smoother.  */
2145         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2146         policy->l_extent.end |= ~CFS_PAGE_MASK;
2147
2148         /*
2149          * kms is not valid when either object is completely fresh (so that no
2150          * locks are cached), or object was evicted. In the latter case cached
2151          * lock cannot be used, because it would prime inode state with
2152          * potentially stale LVB.
2153          */
2154         if (!kms_valid)
2155                 goto no_match;
2156
2157         /* Next, search for already existing extent locks that will cover us */
2158         /* If we're trying to read, we also search for an existing PW lock.  The
2159          * VFS and page cache already protect us locally, so lots of readers/
2160          * writers can share a single PW lock.
2161          *
2162          * There are problems with conversion deadlocks, so instead of
2163          * converting a read lock to a write lock, we'll just enqueue a new
2164          * one.
2165          *
2166          * At some point we should cancel the read lock instead of making them
2167          * send us a blocking callback, but there are problems with canceling
2168          * locks out from other users right now, too. */
2169         mode = einfo->ei_mode;
2170         if (einfo->ei_mode == LCK_PR)
2171                 mode |= LCK_PW;
2172         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2173                                einfo->ei_type, policy, mode, &lockh, 0);
2174         if (mode) {
2175                 struct ldlm_lock *matched;
2176
2177                 if (*flags & LDLM_FL_TEST_LOCK)
2178                         RETURN(ELDLM_OK);
2179
2180                 matched = ldlm_handle2lock(&lockh);
2181                 if (agl) {
2182                         /* AGL enqueues DLM locks speculatively. Therefore if
2183                          * it already exists a DLM lock, it wll just inform the
2184                          * caller to cancel the AGL process for this stripe. */
2185                         ldlm_lock_decref(&lockh, mode);
2186                         LDLM_LOCK_PUT(matched);
2187                         RETURN(-ECANCELED);
2188                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2189                         *flags |= LDLM_FL_LVB_READY;
2190
2191                         /* We already have a lock, and it's referenced. */
2192                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2193
2194                         ldlm_lock_decref(&lockh, mode);
2195                         LDLM_LOCK_PUT(matched);
2196                         RETURN(ELDLM_OK);
2197                 } else {
2198                         ldlm_lock_decref(&lockh, mode);
2199                         LDLM_LOCK_PUT(matched);
2200                 }
2201         }
2202
2203 no_match:
2204         if (*flags & LDLM_FL_TEST_LOCK)
2205                 RETURN(-ENOLCK);
2206
2207         if (intent) {
2208                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2209                                            &RQF_LDLM_ENQUEUE_LVB);
2210                 if (req == NULL)
2211                         RETURN(-ENOMEM);
2212
2213                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2214                 if (rc < 0) {
2215                         ptlrpc_request_free(req);
2216                         RETURN(rc);
2217                 }
2218
2219                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2220                                      sizeof *lvb);
2221                 ptlrpc_request_set_replen(req);
2222         }
2223
2224         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2225         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2226
2227         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2228                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2229         if (async) {
2230                 if (!rc) {
2231                         struct osc_enqueue_args *aa;
2232                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2233                         aa = ptlrpc_req_async_args(req);
2234                         aa->oa_exp    = exp;
2235                         aa->oa_mode   = einfo->ei_mode;
2236                         aa->oa_type   = einfo->ei_type;
2237                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2238                         aa->oa_upcall = upcall;
2239                         aa->oa_cookie = cookie;
2240                         aa->oa_agl    = !!agl;
2241                         if (!agl) {
2242                                 aa->oa_flags  = flags;
2243                                 aa->oa_lvb    = lvb;
2244                         } else {
2245                                 /* AGL is essentially to enqueue an DLM lock
2246                                  * in advance, so we don't care about the
2247                                  * result of AGL enqueue. */
2248                                 aa->oa_lvb    = NULL;
2249                                 aa->oa_flags  = NULL;
2250                         }
2251
2252                         req->rq_interpret_reply =
2253                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2254                         if (rqset == PTLRPCD_SET)
2255                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2256                         else
2257                                 ptlrpc_set_add_req(rqset, req);
2258                 } else if (intent) {
2259                         ptlrpc_req_finished(req);
2260                 }
2261                 RETURN(rc);
2262         }
2263
2264         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2265                               flags, agl, rc);
2266         if (intent)
2267                 ptlrpc_req_finished(req);
2268
2269         RETURN(rc);
2270 }
2271
2272 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2273                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2274                    __u64 *flags, void *data, struct lustre_handle *lockh,
2275                    int unref)
2276 {
2277         struct obd_device *obd = exp->exp_obd;
2278         __u64 lflags = *flags;
2279         ldlm_mode_t rc;
2280         ENTRY;
2281
2282         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2283                 RETURN(-EIO);
2284
2285         /* Filesystem lock extents are extended to page boundaries so that
2286          * dealing with the page cache is a little smoother */
2287         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2288         policy->l_extent.end |= ~CFS_PAGE_MASK;
2289
2290         /* Next, search for already existing extent locks that will cover us */
2291         /* If we're trying to read, we also search for an existing PW lock.  The
2292          * VFS and page cache already protect us locally, so lots of readers/
2293          * writers can share a single PW lock. */
2294         rc = mode;
2295         if (mode == LCK_PR)
2296                 rc |= LCK_PW;
2297         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2298                              res_id, type, policy, rc, lockh, unref);
2299         if (rc) {
2300                 if (data != NULL) {
2301                         if (!osc_set_data_with_check(lockh, data)) {
2302                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2303                                         ldlm_lock_decref(lockh, rc);
2304                                 RETURN(0);
2305                         }
2306                 }
2307                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2308                         ldlm_lock_addref(lockh, LCK_PR);
2309                         ldlm_lock_decref(lockh, LCK_PW);
2310                 }
2311                 RETURN(rc);
2312         }
2313         RETURN(rc);
2314 }
2315
2316 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2317 {
2318         ENTRY;
2319
2320         if (unlikely(mode == LCK_GROUP))
2321                 ldlm_lock_decref_and_cancel(lockh, mode);
2322         else
2323                 ldlm_lock_decref(lockh, mode);
2324
2325         RETURN(0);
2326 }
2327
2328 static int osc_statfs_interpret(const struct lu_env *env,
2329                                 struct ptlrpc_request *req,
2330                                 struct osc_async_args *aa, int rc)
2331 {
2332         struct obd_statfs *msfs;
2333         ENTRY;
2334
2335         if (rc == -EBADR)
2336                 /* The request has in fact never been sent
2337                  * due to issues at a higher level (LOV).
2338                  * Exit immediately since the caller is
2339                  * aware of the problem and takes care
2340                  * of the clean up */
2341                  RETURN(rc);
2342
2343         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2344             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2345                 GOTO(out, rc = 0);
2346
2347         if (rc != 0)
2348                 GOTO(out, rc);
2349
2350         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2351         if (msfs == NULL) {
2352                 GOTO(out, rc = -EPROTO);
2353         }
2354
2355         *aa->aa_oi->oi_osfs = *msfs;
2356 out:
2357         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2358         RETURN(rc);
2359 }
2360
2361 static int osc_statfs_async(struct obd_export *exp,
2362                             struct obd_info *oinfo, __u64 max_age,
2363                             struct ptlrpc_request_set *rqset)
2364 {
2365         struct obd_device     *obd = class_exp2obd(exp);
2366         struct ptlrpc_request *req;
2367         struct osc_async_args *aa;
2368         int                    rc;
2369         ENTRY;
2370
2371         /* We could possibly pass max_age in the request (as an absolute
2372          * timestamp or a "seconds.usec ago") so the target can avoid doing
2373          * extra calls into the filesystem if that isn't necessary (e.g.
2374          * during mount that would help a bit).  Having relative timestamps
2375          * is not so great if request processing is slow, while absolute
2376          * timestamps are not ideal because they need time synchronization. */
2377         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2378         if (req == NULL)
2379                 RETURN(-ENOMEM);
2380
2381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2382         if (rc) {
2383                 ptlrpc_request_free(req);
2384                 RETURN(rc);
2385         }
2386         ptlrpc_request_set_replen(req);
2387         req->rq_request_portal = OST_CREATE_PORTAL;
2388         ptlrpc_at_set_req_timeout(req);
2389
2390         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2391                 /* procfs requests not want stat in wait for avoid deadlock */
2392                 req->rq_no_resend = 1;
2393                 req->rq_no_delay = 1;
2394         }
2395
2396         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2397         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2398         aa = ptlrpc_req_async_args(req);
2399         aa->aa_oi = oinfo;
2400
2401         ptlrpc_set_add_req(rqset, req);
2402         RETURN(0);
2403 }
2404
2405 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2406                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2407 {
2408         struct obd_device     *obd = class_exp2obd(exp);
2409         struct obd_statfs     *msfs;
2410         struct ptlrpc_request *req;
2411         struct obd_import     *imp = NULL;
2412         int rc;
2413         ENTRY;
2414
2415         /*Since the request might also come from lprocfs, so we need
2416          *sync this with client_disconnect_export Bug15684*/
2417         down_read(&obd->u.cli.cl_sem);
2418         if (obd->u.cli.cl_import)
2419                 imp = class_import_get(obd->u.cli.cl_import);
2420         up_read(&obd->u.cli.cl_sem);
2421         if (!imp)
2422                 RETURN(-ENODEV);
2423
2424         /* We could possibly pass max_age in the request (as an absolute
2425          * timestamp or a "seconds.usec ago") so the target can avoid doing
2426          * extra calls into the filesystem if that isn't necessary (e.g.
2427          * during mount that would help a bit).  Having relative timestamps
2428          * is not so great if request processing is slow, while absolute
2429          * timestamps are not ideal because they need time synchronization. */
2430         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2431
2432         class_import_put(imp);
2433
2434         if (req == NULL)
2435                 RETURN(-ENOMEM);
2436
2437         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2438         if (rc) {
2439                 ptlrpc_request_free(req);
2440                 RETURN(rc);
2441         }
2442         ptlrpc_request_set_replen(req);
2443         req->rq_request_portal = OST_CREATE_PORTAL;
2444         ptlrpc_at_set_req_timeout(req);
2445
2446         if (flags & OBD_STATFS_NODELAY) {
2447                 /* procfs requests not want stat in wait for avoid deadlock */
2448                 req->rq_no_resend = 1;
2449                 req->rq_no_delay = 1;
2450         }
2451
2452         rc = ptlrpc_queue_wait(req);
2453         if (rc)
2454                 GOTO(out, rc);
2455
2456         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2457         if (msfs == NULL) {
2458                 GOTO(out, rc = -EPROTO);
2459         }
2460
2461         *osfs = *msfs;
2462
2463         EXIT;
2464  out:
2465         ptlrpc_req_finished(req);
2466         return rc;
2467 }
2468
2469 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2470                          void *karg, void *uarg)
2471 {
2472         struct obd_device *obd = exp->exp_obd;
2473         struct obd_ioctl_data *data = karg;
2474         int err = 0;
2475         ENTRY;
2476
2477         if (!try_module_get(THIS_MODULE)) {
2478                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2479                        module_name(THIS_MODULE));
2480                 return -EINVAL;
2481         }
2482         switch (cmd) {
2483         case OBD_IOC_CLIENT_RECOVER:
2484                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2485                                             data->ioc_inlbuf1, 0);
2486                 if (err > 0)
2487                         err = 0;
2488                 GOTO(out, err);
2489         case IOC_OSC_SET_ACTIVE:
2490                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2491                                                data->ioc_offset);
2492                 GOTO(out, err);
2493         case OBD_IOC_POLL_QUOTACHECK:
2494                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2495                 GOTO(out, err);
2496         case OBD_IOC_PING_TARGET:
2497                 err = ptlrpc_obd_ping(obd);
2498                 GOTO(out, err);
2499         default:
2500                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2501                        cmd, current_comm());
2502                 GOTO(out, err = -ENOTTY);
2503         }
2504 out:
2505         module_put(THIS_MODULE);
2506         return err;
2507 }
2508
2509 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2510                         obd_count keylen, void *key, __u32 *vallen, void *val,
2511                         struct lov_stripe_md *lsm)
2512 {
2513         ENTRY;
2514         if (!vallen || !val)
2515                 RETURN(-EFAULT);
2516
2517         if (KEY_IS(KEY_FIEMAP)) {
2518                 struct ll_fiemap_info_key *fm_key =
2519                                 (struct ll_fiemap_info_key *)key;
2520                 struct ldlm_res_id       res_id;
2521                 ldlm_policy_data_t       policy;
2522                 struct lustre_handle     lockh;
2523                 ldlm_mode_t              mode = 0;
2524                 struct ptlrpc_request   *req;
2525                 struct ll_user_fiemap   *reply;
2526                 char                    *tmp;
2527                 int                      rc;
2528
2529                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2530                         goto skip_locking;
2531
2532                 policy.l_extent.start = fm_key->fiemap.fm_start &
2533                                                 CFS_PAGE_MASK;
2534
2535                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2536                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2537                         policy.l_extent.end = OBD_OBJECT_EOF;
2538                 else
2539                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2540                                 fm_key->fiemap.fm_length +
2541                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2542
2543                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2544                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2545                                        LDLM_FL_BLOCK_GRANTED |
2546                                        LDLM_FL_LVB_READY,
2547                                        &res_id, LDLM_EXTENT, &policy,
2548                                        LCK_PR | LCK_PW, &lockh, 0);
2549                 if (mode) { /* lock is cached on client */
2550                         if (mode != LCK_PR) {
2551                                 ldlm_lock_addref(&lockh, LCK_PR);
2552                                 ldlm_lock_decref(&lockh, LCK_PW);
2553                         }
2554                 } else { /* no cached lock, needs acquire lock on server side */
2555                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2556                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2557                 }
2558
2559 skip_locking:
2560                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2561                                            &RQF_OST_GET_INFO_FIEMAP);
2562                 if (req == NULL)
2563                         GOTO(drop_lock, rc = -ENOMEM);
2564
2565                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2566                                      RCL_CLIENT, keylen);
2567                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2568                                      RCL_CLIENT, *vallen);
2569                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2570                                      RCL_SERVER, *vallen);
2571
2572                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2573                 if (rc) {
2574                         ptlrpc_request_free(req);
2575                         GOTO(drop_lock, rc);
2576                 }
2577
2578                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2579                 memcpy(tmp, key, keylen);
2580                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2581                 memcpy(tmp, val, *vallen);
2582
2583                 ptlrpc_request_set_replen(req);
2584                 rc = ptlrpc_queue_wait(req);
2585                 if (rc)
2586                         GOTO(fini_req, rc);
2587
2588                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2589                 if (reply == NULL)
2590                         GOTO(fini_req, rc = -EPROTO);
2591
2592                 memcpy(val, reply, *vallen);
2593 fini_req:
2594                 ptlrpc_req_finished(req);
2595 drop_lock:
2596                 if (mode)
2597                         ldlm_lock_decref(&lockh, LCK_PR);
2598                 RETURN(rc);
2599         }
2600
2601         RETURN(-EINVAL);
2602 }
2603
2604 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2605                               obd_count keylen, void *key, obd_count vallen,
2606                               void *val, struct ptlrpc_request_set *set)
2607 {
2608         struct ptlrpc_request *req;
2609         struct obd_device     *obd = exp->exp_obd;
2610         struct obd_import     *imp = class_exp2cliimp(exp);
2611         char                  *tmp;
2612         int                    rc;
2613         ENTRY;
2614
2615         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2616
2617         if (KEY_IS(KEY_CHECKSUM)) {
2618                 if (vallen != sizeof(int))
2619                         RETURN(-EINVAL);
2620                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2621                 RETURN(0);
2622         }
2623
2624         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2625                 sptlrpc_conf_client_adapt(obd);
2626                 RETURN(0);
2627         }
2628
2629         if (KEY_IS(KEY_FLUSH_CTX)) {
2630                 sptlrpc_import_flush_my_ctx(imp);
2631                 RETURN(0);
2632         }
2633
2634         if (KEY_IS(KEY_CACHE_SET)) {
2635                 struct client_obd *cli = &obd->u.cli;
2636
2637                 LASSERT(cli->cl_cache == NULL); /* only once */
2638                 cli->cl_cache = (struct cl_client_cache *)val;
2639                 atomic_inc(&cli->cl_cache->ccc_users);
2640                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2641
2642                 /* add this osc into entity list */
2643                 LASSERT(list_empty(&cli->cl_lru_osc));
2644                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2645                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2646                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2647
2648                 RETURN(0);
2649         }
2650
2651         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2652                 struct client_obd *cli = &obd->u.cli;
2653                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2654                 long target = *(long *)val;
2655
2656                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2657                 *(long *)val -= nr;
2658                 RETURN(0);
2659         }
2660
2661         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2662                 RETURN(-EINVAL);
2663
2664         /* We pass all other commands directly to OST. Since nobody calls osc
2665            methods directly and everybody is supposed to go through LOV, we
2666            assume lov checked invalid values for us.
2667            The only recognised values so far are evict_by_nid and mds_conn.
2668            Even if something bad goes through, we'd get a -EINVAL from OST
2669            anyway. */
2670
2671         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2672                                                 &RQF_OST_SET_GRANT_INFO :
2673                                                 &RQF_OBD_SET_INFO);
2674         if (req == NULL)
2675                 RETURN(-ENOMEM);
2676
2677         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2678                              RCL_CLIENT, keylen);
2679         if (!KEY_IS(KEY_GRANT_SHRINK))
2680                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2681                                      RCL_CLIENT, vallen);
2682         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2683         if (rc) {
2684                 ptlrpc_request_free(req);
2685                 RETURN(rc);
2686         }
2687
2688         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2689         memcpy(tmp, key, keylen);
2690         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2691                                                         &RMF_OST_BODY :
2692                                                         &RMF_SETINFO_VAL);
2693         memcpy(tmp, val, vallen);
2694
2695         if (KEY_IS(KEY_GRANT_SHRINK)) {
2696                 struct osc_grant_args *aa;
2697                 struct obdo *oa;
2698
2699                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2700                 aa = ptlrpc_req_async_args(req);
2701                 OBDO_ALLOC(oa);
2702                 if (!oa) {
2703                         ptlrpc_req_finished(req);
2704                         RETURN(-ENOMEM);
2705                 }
2706                 *oa = ((struct ost_body *)val)->oa;
2707                 aa->aa_oa = oa;
2708                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2709         }
2710
2711         ptlrpc_request_set_replen(req);
2712         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2713                 LASSERT(set != NULL);
2714                 ptlrpc_set_add_req(set, req);
2715                 ptlrpc_check_set(NULL, set);
2716         } else
2717                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2718
2719         RETURN(0);
2720 }
2721
2722 static int osc_reconnect(const struct lu_env *env,
2723                          struct obd_export *exp, struct obd_device *obd,
2724                          struct obd_uuid *cluuid,
2725                          struct obd_connect_data *data,
2726                          void *localdata)
2727 {
2728         struct client_obd *cli = &obd->u.cli;
2729
2730         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2731                 long lost_grant;
2732
2733                 spin_lock(&cli->cl_loi_list_lock);
2734                 data->ocd_grant = (cli->cl_avail_grant +
2735                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2736                                   2 * cli_brw_size(obd);
2737                 lost_grant = cli->cl_lost_grant;
2738                 cli->cl_lost_grant = 0;
2739                 spin_unlock(&cli->cl_loi_list_lock);
2740
2741                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2742                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2743                        data->ocd_version, data->ocd_grant, lost_grant);
2744         }
2745
2746         RETURN(0);
2747 }
2748
2749 static int osc_disconnect(struct obd_export *exp)
2750 {
2751         struct obd_device *obd = class_exp2obd(exp);
2752         int rc;
2753
2754         rc = client_disconnect_export(exp);
2755         /**
2756          * Initially we put del_shrink_grant before disconnect_export, but it
2757          * causes the following problem if setup (connect) and cleanup
2758          * (disconnect) are tangled together.
2759          *      connect p1                     disconnect p2
2760          *   ptlrpc_connect_import
2761          *     ...............               class_manual_cleanup
2762          *                                     osc_disconnect
2763          *                                     del_shrink_grant
2764          *   ptlrpc_connect_interrupt
2765          *     init_grant_shrink
2766          *   add this client to shrink list
2767          *                                      cleanup_osc
2768          * Bang! pinger trigger the shrink.
2769          * So the osc should be disconnected from the shrink list, after we
2770          * are sure the import has been destroyed. BUG18662
2771          */
2772         if (obd->u.cli.cl_import == NULL)
2773                 osc_del_shrink_grant(&obd->u.cli);
2774         return rc;
2775 }
2776
2777 static int osc_import_event(struct obd_device *obd,
2778                             struct obd_import *imp,
2779                             enum obd_import_event event)
2780 {
2781         struct client_obd *cli;
2782         int rc = 0;
2783
2784         ENTRY;
2785         LASSERT(imp->imp_obd == obd);
2786
2787         switch (event) {
2788         case IMP_EVENT_DISCON: {
2789                 cli = &obd->u.cli;
2790                 spin_lock(&cli->cl_loi_list_lock);
2791                 cli->cl_avail_grant = 0;
2792                 cli->cl_lost_grant = 0;
2793                 spin_unlock(&cli->cl_loi_list_lock);
2794                 break;
2795         }
2796         case IMP_EVENT_INACTIVE: {
2797                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2798                 break;
2799         }
2800         case IMP_EVENT_INVALIDATE: {
2801                 struct ldlm_namespace *ns = obd->obd_namespace;
2802                 struct lu_env         *env;
2803                 int                    refcheck;
2804
2805                 env = cl_env_get(&refcheck);
2806                 if (!IS_ERR(env)) {
2807                         /* Reset grants */
2808                         cli = &obd->u.cli;
2809                         /* all pages go to failing rpcs due to the invalid
2810                          * import */
2811                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2812
2813                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2814                         cl_env_put(env, &refcheck);
2815                 } else
2816                         rc = PTR_ERR(env);
2817                 break;
2818         }
2819         case IMP_EVENT_ACTIVE: {
2820                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2821                 break;
2822         }
2823         case IMP_EVENT_OCD: {
2824                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2825
2826                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2827                         osc_init_grant(&obd->u.cli, ocd);
2828
2829                 /* See bug 7198 */
2830                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2831                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2832
2833                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2834                 break;
2835         }
2836         case IMP_EVENT_DEACTIVATE: {
2837                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2838                 break;
2839         }
2840         case IMP_EVENT_ACTIVATE: {
2841                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2842                 break;
2843         }
2844         default:
2845                 CERROR("Unknown import event %d\n", event);
2846                 LBUG();
2847         }
2848         RETURN(rc);
2849 }
2850
2851 /**
2852  * Determine whether the lock can be canceled before replaying the lock
2853  * during recovery, see bug16774 for detailed information.
2854  *
2855  * \retval zero the lock can't be canceled
2856  * \retval other ok to cancel
2857  */
2858 static int osc_cancel_weight(struct ldlm_lock *lock)
2859 {
2860         /*
2861          * Cancel all unused and granted extent lock.
2862          */
2863         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2864             lock->l_granted_mode == lock->l_req_mode &&
2865             osc_ldlm_weigh_ast(lock) == 0)
2866                 RETURN(1);
2867
2868         RETURN(0);
2869 }
2870
2871 static int brw_queue_work(const struct lu_env *env, void *data)
2872 {
2873         struct client_obd *cli = data;
2874
2875         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2876
2877         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2878         RETURN(0);
2879 }
2880
2881 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2882 {
2883         struct client_obd *cli = &obd->u.cli;
2884         struct obd_type   *type;
2885         void              *handler;
2886         int                rc;
2887         ENTRY;
2888
2889         rc = ptlrpcd_addref();
2890         if (rc)
2891                 RETURN(rc);
2892
2893         rc = client_obd_setup(obd, lcfg);
2894         if (rc)
2895                 GOTO(out_ptlrpcd, rc);
2896
2897         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2898         if (IS_ERR(handler))
2899                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2900         cli->cl_writeback_work = handler;
2901
2902         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2903         if (IS_ERR(handler))
2904                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2905         cli->cl_lru_work = handler;
2906
2907         rc = osc_quota_setup(obd);
2908         if (rc)
2909                 GOTO(out_ptlrpcd_work, rc);
2910
2911         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2912
2913 #ifdef CONFIG_PROC_FS
2914         obd->obd_vars = lprocfs_osc_obd_vars;
2915 #endif
2916         /* If this is true then both client (osc) and server (osp) are on the
2917          * same node. The osp layer if loaded first will register the osc proc
2918          * directory. In that case this obd_device will be attached its proc
2919          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2920         type = class_search_type(LUSTRE_OSP_NAME);
2921         if (type && type->typ_procsym) {
2922                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2923                                                        type->typ_procsym,
2924                                                        obd->obd_vars, obd);
2925                 if (IS_ERR(obd->obd_proc_entry)) {
2926                         rc = PTR_ERR(obd->obd_proc_entry);
2927                         CERROR("error %d setting up lprocfs for %s\n", rc,
2928                                obd->obd_name);
2929                         obd->obd_proc_entry = NULL;
2930                 }
2931         } else {
2932                 rc = lprocfs_obd_setup(obd);
2933         }
2934
2935         /* If the basic OSC proc tree construction succeeded then
2936          * lets do the rest. */
2937         if (rc == 0) {
2938                 lproc_osc_attach_seqstat(obd);
2939                 sptlrpc_lprocfs_cliobd_attach(obd);
2940                 ptlrpc_lprocfs_register_obd(obd);
2941         }
2942
2943         /* We need to allocate a few requests more, because
2944          * brw_interpret tries to create new requests before freeing
2945          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2946          * reserved, but I'm afraid that might be too much wasted RAM
2947          * in fact, so 2 is just my guess and still should work. */
2948         cli->cl_import->imp_rq_pool =
2949                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2950                                     OST_MAXREQSIZE,
2951                                     ptlrpc_add_rqs_to_pool);
2952
2953         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2954         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2955         RETURN(0);
2956
2957 out_ptlrpcd_work:
2958         if (cli->cl_writeback_work != NULL) {
2959                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2960                 cli->cl_writeback_work = NULL;
2961         }
2962         if (cli->cl_lru_work != NULL) {
2963                 ptlrpcd_destroy_work(cli->cl_lru_work);
2964                 cli->cl_lru_work = NULL;
2965         }
2966 out_client_setup:
2967         client_obd_cleanup(obd);
2968 out_ptlrpcd:
2969         ptlrpcd_decref();
2970         RETURN(rc);
2971 }
2972
2973 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2974 {
2975         int rc = 0;
2976         ENTRY;
2977
2978         switch (stage) {
2979         case OBD_CLEANUP_EARLY: {
2980                 struct obd_import *imp;
2981                 imp = obd->u.cli.cl_import;
2982                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2983                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2984                 ptlrpc_deactivate_import(imp);
2985                 spin_lock(&imp->imp_lock);
2986                 imp->imp_pingable = 0;
2987                 spin_unlock(&imp->imp_lock);
2988                 break;
2989         }
2990         case OBD_CLEANUP_EXPORTS: {
2991                 struct client_obd *cli = &obd->u.cli;
2992                 /* LU-464
2993                  * for echo client, export may be on zombie list, wait for
2994                  * zombie thread to cull it, because cli.cl_import will be
2995                  * cleared in client_disconnect_export():
2996                  *   class_export_destroy() -> obd_cleanup() ->
2997                  *   echo_device_free() -> echo_client_cleanup() ->
2998                  *   obd_disconnect() -> osc_disconnect() ->
2999                  *   client_disconnect_export()
3000                  */
3001                 obd_zombie_barrier();
3002                 if (cli->cl_writeback_work) {
3003                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3004                         cli->cl_writeback_work = NULL;
3005                 }
3006                 if (cli->cl_lru_work) {
3007                         ptlrpcd_destroy_work(cli->cl_lru_work);
3008                         cli->cl_lru_work = NULL;
3009                 }
3010                 obd_cleanup_client_import(obd);
3011                 ptlrpc_lprocfs_unregister_obd(obd);
3012                 lprocfs_obd_cleanup(obd);
3013                 break;
3014                 }
3015         }
3016         RETURN(rc);
3017 }
3018
3019 int osc_cleanup(struct obd_device *obd)
3020 {
3021         struct client_obd *cli = &obd->u.cli;
3022         int rc;
3023
3024         ENTRY;
3025
3026         /* lru cleanup */
3027         if (cli->cl_cache != NULL) {
3028                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3029                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3030                 list_del_init(&cli->cl_lru_osc);
3031                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3032                 cli->cl_lru_left = NULL;
3033                 atomic_dec(&cli->cl_cache->ccc_users);
3034                 cli->cl_cache = NULL;
3035         }
3036
3037         /* free memory of osc quota cache */
3038         osc_quota_cleanup(obd);
3039
3040         rc = client_obd_cleanup(obd);
3041
3042         ptlrpcd_decref();
3043         RETURN(rc);
3044 }
3045
3046 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3047 {
3048         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3049         return rc > 0 ? 0: rc;
3050 }
3051
3052 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3053 {
3054         return osc_process_config_base(obd, buf);
3055 }
3056
3057 static struct obd_ops osc_obd_ops = {
3058         .o_owner                = THIS_MODULE,
3059         .o_setup                = osc_setup,
3060         .o_precleanup           = osc_precleanup,
3061         .o_cleanup              = osc_cleanup,
3062         .o_add_conn             = client_import_add_conn,
3063         .o_del_conn             = client_import_del_conn,
3064         .o_connect              = client_connect_import,
3065         .o_reconnect            = osc_reconnect,
3066         .o_disconnect           = osc_disconnect,
3067         .o_statfs               = osc_statfs,
3068         .o_statfs_async         = osc_statfs_async,
3069         .o_create               = osc_create,
3070         .o_destroy              = osc_destroy,
3071         .o_getattr              = osc_getattr,
3072         .o_getattr_async        = osc_getattr_async,
3073         .o_setattr              = osc_setattr,
3074         .o_setattr_async        = osc_setattr_async,
3075         .o_iocontrol            = osc_iocontrol,
3076         .o_get_info             = osc_get_info,
3077         .o_set_info_async       = osc_set_info_async,
3078         .o_import_event         = osc_import_event,
3079         .o_process_config       = osc_process_config,
3080         .o_quotactl             = osc_quotactl,
3081         .o_quotacheck           = osc_quotacheck,
3082 };
3083
3084 static int __init osc_init(void)
3085 {
3086         bool enable_proc = true;
3087         struct obd_type *type;
3088         int rc;
3089         ENTRY;
3090
3091         /* print an address of _any_ initialized kernel symbol from this
3092          * module, to allow debugging with gdb that doesn't support data
3093          * symbols from modules.*/
3094         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3095
3096         rc = lu_kmem_init(osc_caches);
3097         if (rc)
3098                 RETURN(rc);
3099
3100         type = class_search_type(LUSTRE_OSP_NAME);
3101         if (type != NULL && type->typ_procsym != NULL)
3102                 enable_proc = false;
3103
3104         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3105                                  LUSTRE_OSC_NAME, &osc_device_type);
3106         if (rc) {
3107                 lu_kmem_fini(osc_caches);
3108                 RETURN(rc);
3109         }
3110
3111         RETURN(rc);
3112 }
3113
3114 static void /*__exit*/ osc_exit(void)
3115 {
3116         class_unregister_type(LUSTRE_OSC_NAME);
3117         lu_kmem_fini(osc_caches);
3118 }
3119
3120 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3121 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3122 MODULE_LICENSE("GPL");
3123
3124 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);