Whamcloud - gitweb
LU-5731 osp: flush async updates for osp_sync
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into a osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * Author: Di Wang <di.wang@intel.com>
57  * Author: Fan, Yong <fan.yong@intel.com>
58  */
59
60 #define DEBUG_SUBSYSTEM S_MDS
61
62 #include "osp_internal.h"
63
64 struct osp_async_update_args {
65         struct dt_update_request *oaua_update;
66         atomic_t                 *oaua_count;
67         wait_queue_head_t        *oaua_waitq;
68         bool                      oaua_flow_control;
69 };
70
71 struct osp_async_request {
72         /* list in the dt_update_request::dur_cb_items */
73         struct list_head                 oar_list;
74
75         /* The target of the async update request. */
76         struct osp_object               *oar_obj;
77
78         /* The data used by oar_interpreter. */
79         void                            *oar_data;
80
81         /* The interpreter function called after the async request handled. */
82         osp_async_request_interpreter_t  oar_interpreter;
83 };
84
85 /**
86  * Allocate an asynchronous request and initialize it with the given parameters.
87  *
88  * \param[in] obj               pointer to the operation target
89  * \param[in] data              pointer to the data used by the interpreter
90  * \param[in] interpreter       pointer to the interpreter function
91  *
92  * \retval                      pointer to the asychronous request
93  * \retval                      NULL if the allocation failed
94  */
95 static struct osp_async_request *
96 osp_async_request_init(struct osp_object *obj, void *data,
97                        osp_async_request_interpreter_t interpreter)
98 {
99         struct osp_async_request *oar;
100
101         OBD_ALLOC_PTR(oar);
102         if (oar == NULL)
103                 return NULL;
104
105         lu_object_get(osp2lu_obj(obj));
106         INIT_LIST_HEAD(&oar->oar_list);
107         oar->oar_obj = obj;
108         oar->oar_data = data;
109         oar->oar_interpreter = interpreter;
110
111         return oar;
112 }
113
114 /**
115  * Destroy the asychronous request.
116  *
117  * \param[in] env       pointer to the thread context
118  * \param[in] oar       pointer to asychronous request
119  */
120 static void osp_async_request_fini(const struct lu_env *env,
121                                    struct osp_async_request *oar)
122 {
123         LASSERT(list_empty(&oar->oar_list));
124
125         lu_object_put(env, osp2lu_obj(oar->oar_obj));
126         OBD_FREE_PTR(oar);
127 }
128
129 /**
130  * Interpret the packaged OUT RPC results.
131  *
132  * For every packaged sub-request, call its registered interpreter function.
133  * Then destroy the sub-request.
134  *
135  * \param[in] env       pointer to the thread context
136  * \param[in] req       pointer to the RPC
137  * \param[in] arg       pointer to data used by the interpreter
138  * \param[in] rc        the RPC return value
139  *
140  * \retval              0 for success
141  * \retval              negative error number on failure
142  */
143 static int osp_async_update_interpret(const struct lu_env *env,
144                                       struct ptlrpc_request *req,
145                                       void *arg, int rc)
146 {
147         struct object_update_reply      *reply  = NULL;
148         struct osp_async_update_args    *oaua   = arg;
149         struct dt_update_request        *dt_update = oaua->oaua_update;
150         struct osp_async_request        *oar;
151         struct osp_async_request        *next;
152         int                              count  = 0;
153         int                              index  = 0;
154         int                              rc1    = 0;
155
156         if (oaua->oaua_flow_control)
157                 obd_put_request_slot(
158                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
159
160         /* Unpack the results from the reply message. */
161         if (req->rq_repmsg != NULL) {
162                 reply = req_capsule_server_sized_get(&req->rq_pill,
163                                                      &RMF_OUT_UPDATE_REPLY,
164                                                      OUT_UPDATE_REPLY_SIZE);
165                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
166                         rc1 = -EPROTO;
167                 else
168                         count = reply->ourp_count;
169         } else {
170                 rc1 = rc;
171         }
172
173         list_for_each_entry_safe(oar, next, &dt_update->dur_cb_items,
174                                  oar_list) {
175                 list_del_init(&oar->oar_list);
176
177                 /* The peer may only have handled some requests (indicated
178                  * by the 'count') in the packaged OUT RPC, we can only get
179                  * results for the handled part. */
180                 if (index < count && reply->ourp_lens[index] > 0) {
181                         struct object_update_result *result;
182
183                         result = object_update_result_get(reply, index, NULL);
184                         if (result == NULL)
185                                 rc1 = -EPROTO;
186                         else
187                                 rc1 = result->our_rc;
188                 } else {
189                         rc1 = rc;
190                         if (unlikely(rc1 == 0))
191                                 rc1 = -EINVAL;
192                 }
193
194                 oar->oar_interpreter(env, reply, req, oar->oar_obj,
195                                        oar->oar_data, index, rc1);
196                 osp_async_request_fini(env, oar);
197                 index++;
198         }
199
200         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
201                 wake_up_all(oaua->oaua_waitq);
202
203         dt_update_request_destroy(dt_update);
204
205         return 0;
206 }
207
208 /**
209  * Pack all the requests in the shared asynchronous idempotent request queue
210  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
211  *
212  * \param[in] env       pointer to the thread context
213  * \param[in] osp       pointer to the OSP device
214  * \param[in] update    pointer to the shared queue
215  *
216  * \retval              0 for success
217  * \retval              negative error number on failure
218  */
219 int osp_unplug_async_request(const struct lu_env *env,
220                              struct osp_device *osp,
221                              struct dt_update_request *update)
222 {
223         struct osp_async_update_args    *args;
224         struct ptlrpc_request           *req = NULL;
225         int                              rc;
226
227         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
228                                  update->dur_buf.ub_req, &req);
229         if (rc != 0) {
230                 struct osp_async_request *oar;
231                 struct osp_async_request *next;
232
233                 list_for_each_entry_safe(oar, next,
234                                          &update->dur_cb_items, oar_list) {
235                         list_del_init(&oar->oar_list);
236                         oar->oar_interpreter(env, NULL, NULL, oar->oar_obj,
237                                                oar->oar_data, 0, rc);
238                         osp_async_request_fini(env, oar);
239                 }
240                 dt_update_request_destroy(update);
241         } else {
242                 LASSERT(list_empty(&update->dur_list));
243
244                 args = ptlrpc_req_async_args(req);
245                 args->oaua_update = update;
246                 args->oaua_count = NULL;
247                 args->oaua_waitq = NULL;
248                 args->oaua_flow_control = false;
249                 req->rq_interpret_reply = osp_async_update_interpret;
250                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
251         }
252
253         return rc;
254 }
255
256 /**
257  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
258  * request queue - osp_device::opd_async_requests.
259  *
260  * If the osp_device::opd_async_requests is not NULL, then return it directly;
261  * otherwise create new dt_update_request and attach it to opd_async_requests.
262  *
263  * \param[in] osp       pointer to the OSP device
264  *
265  * \retval              pointer to the shared queue
266  * \retval              negative error number on failure
267  */
268 static struct dt_update_request *
269 osp_find_or_create_async_update_request(struct osp_device *osp)
270 {
271         struct dt_update_request *update = osp->opd_async_requests;
272
273         if (update != NULL)
274                 return update;
275
276         update = dt_update_request_create(&osp->opd_dt_dev);
277         if (!IS_ERR(update))
278                 osp->opd_async_requests = update;
279
280         return update;
281 }
282
283 /**
284  * Insert an asynchronous idempotent request to the shared request queue that
285  * is attached to the osp_device.
286  *
287  * This function generates a new osp_async_request with the given parameters,
288  * then tries to insert the request into the osp_device-based shared request
289  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
290  * the shared queue firstly, and then re-tries.
291  *
292  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
293  *       osp_insert_async_request call from others.
294  *
295  * \param[in] env               pointer to the thread context
296  * \param[in] op                operation type, see 'enum update_type'
297  * \param[in] obj               pointer to the operation target
298  * \param[in] count             array size of the subsequent @lens and @bufs
299  * \param[in] lens              buffer length array for the subsequent @bufs
300  * \param[in] bufs              the buffers to compose the request
301  * \param[in] data              pointer to the data used by the interpreter
302  * \param[in] interpreter       pointer to the interpreter function
303  *
304  * \retval                      0 for success
305  * \retval                      negative error number on failure
306  */
307 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
308                              struct osp_object *obj, int count,
309                              __u16 *lens, const void **bufs, void *data,
310                              osp_async_request_interpreter_t interpreter)
311 {
312         struct osp_async_request     *oar;
313         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
314         struct dt_update_request     *update;
315         int                           rc  = 0;
316         ENTRY;
317
318         oar = osp_async_request_init(obj, data, interpreter);
319         if (oar == NULL)
320                 RETURN(-ENOMEM);
321
322         update = osp_find_or_create_async_update_request(osp);
323         if (IS_ERR(update))
324                 GOTO(out, rc = PTR_ERR(update));
325
326 again:
327         /* The queue is full. */
328         rc = out_update_pack(env, &update->dur_buf, op,
329                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
330                              0);
331         if (rc == -E2BIG) {
332                 osp->opd_async_requests = NULL;
333                 mutex_unlock(&osp->opd_async_requests_mutex);
334
335                 rc = osp_unplug_async_request(env, osp, update);
336                 mutex_lock(&osp->opd_async_requests_mutex);
337                 if (rc != 0)
338                         GOTO(out, rc);
339
340                 update = osp_find_or_create_async_update_request(osp);
341                 if (IS_ERR(update))
342                         GOTO(out, rc = PTR_ERR(update));
343
344                 goto again;
345         }
346
347         if (rc == 0)
348                 list_add_tail(&oar->oar_list, &update->dur_cb_items);
349
350         GOTO(out, rc);
351
352 out:
353         if (rc != 0)
354                 osp_async_request_fini(env, oar);
355
356         return rc;
357 }
358
359 /**
360  * The OSP layer dt_device_operations::dt_trans_create() interface
361  * to create a transaction.
362  *
363  * There are two kinds of transactions that will involve OSP:
364  *
365  * 1) If the transaction only contains the updates on remote server
366  *    (MDT or OST), such as re-generating the lost OST-object for
367  *    LFSCK, then it is a remote transaction. For remote transaction,
368  *    the upper layer caller (such as the LFSCK engine) will call the
369  *    dt_trans_create() (with the OSP dt_device as the parameter),
370  *    then the call will be directed to the osp_trans_create() that
371  *    creates the transaction handler and returns it to the caller.
372  *
373  * 2) If the transcation contains both local and remote updates,
374  *    such as cross MDTs create under DNE mode, then the upper layer
375  *    caller will not trigger osp_trans_create(). Instead, it will
376  *    call dt_trans_create() on other dt_device, such as LOD that
377  *    will generate the transaction handler. Such handler will be
378  *    used by the whole transaction in subsequent sub-operations.
379  *
380  * \param[in] env       pointer to the thread context
381  * \param[in] d         pointer to the OSP dt_device
382  *
383  * \retval              pointer to the transaction handler
384  * \retval              negative error number on failure
385  */
386 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
387 {
388         struct thandle          *th = NULL;
389         struct thandle_update   *tu = NULL;
390         int                      rc = 0;
391
392         OBD_ALLOC_PTR(th);
393         if (unlikely(th == NULL))
394                 GOTO(out, rc = -ENOMEM);
395
396         th->th_dev = d;
397         th->th_tags = LCT_TX_HANDLE;
398         atomic_set(&th->th_refc, 1);
399         th->th_alloc_size = sizeof(*th);
400
401         OBD_ALLOC_PTR(tu);
402         if (tu == NULL)
403                 GOTO(out, rc = -ENOMEM);
404
405         INIT_LIST_HEAD(&tu->tu_remote_update_list);
406         tu->tu_only_remote_trans = 1;
407         th->th_update = tu;
408
409 out:
410         if (rc != 0) {
411                 if (tu != NULL)
412                         OBD_FREE_PTR(tu);
413                 if (th != NULL)
414                         OBD_FREE_PTR(th);
415                 th = ERR_PTR(rc);
416         }
417
418         return th;
419 }
420
421 /**
422  * Trigger the request for remote updates.
423  *
424  * If the transaction is not a remote one or it is required to be sync mode
425  * (th->th_sync is set), then it will be sent synchronously; otherwise, the
426  * RPC will be sent asynchronously.
427  *
428  * Please refer to osp_trans_create() for transaction type.
429  *
430  * \param[in] env               pointer to the thread context
431  * \param[in] osp               pointer to the OSP device
432  * \param[in] dt_update         pointer to the dt_update_request
433  * \param[in] th                pointer to the transaction handler
434  * \param[in] flow_control      whether need to control the flow
435  *
436  * \retval                      0 for success
437  * \retval                      negative error number on failure
438  */
439 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
440                              struct dt_update_request *dt_update,
441                              struct thandle *th, bool flow_control)
442 {
443         struct thandle_update   *tu = th->th_update;
444         int                      rc = 0;
445
446         LASSERT(tu != NULL);
447
448         if (is_only_remote_trans(th)) {
449                 struct osp_async_update_args    *args;
450                 struct ptlrpc_request           *req;
451
452                 list_del_init(&dt_update->dur_list);
453                 if (th->th_sync) {
454                         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
455                                              dt_update, NULL);
456                         dt_update_request_destroy(dt_update);
457
458                         return rc;
459                 }
460
461                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
462                                          dt_update->dur_buf.ub_req, &req);
463                 if (rc == 0) {
464                         down_read(&osp->opd_async_updates_rwsem);
465
466                         args = ptlrpc_req_async_args(req);
467                         args->oaua_update = dt_update;
468                         args->oaua_count = &osp->opd_async_updates_count;
469                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
470                         args->oaua_flow_control = flow_control;
471                         req->rq_interpret_reply =
472                                 osp_async_update_interpret;
473
474                         atomic_inc(args->oaua_count);
475                         up_read(&osp->opd_async_updates_rwsem);
476
477                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
478                 } else {
479                         dt_update_request_destroy(dt_update);
480                 }
481         } else {
482                 th->th_sync = 1;
483                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
484                                      dt_update, NULL);
485         }
486
487         return rc;
488 }
489
490 /**
491  * The OSP layer dt_device_operations::dt_trans_start() interface
492  * to start the transaction.
493  *
494  * If the transaction is a remote transaction, then related remote
495  * updates will be triggered in the osp_trans_stop(); otherwise the
496  * transaction contains both local and remote update(s), then when
497  * the OUT RPC will be triggered depends on the operation, and is
498  * indicated by the dt_device::tu_sent_after_local_trans, for example:
499  *
500  * 1) If it is remote create, it will send the remote req after local
501  * transaction. i.e. create the object locally first, then insert the
502  * remote name entry.
503  *
504  * 2) If it is remote unlink, it will send the remote req before the
505  * local transaction, i.e. delete the name entry remotely first, then
506  * destroy the local object.
507  *
508  * Please refer to osp_trans_create() for transaction type.
509  *
510  * \param[in] env               pointer to the thread context
511  * \param[in] dt                pointer to the OSP dt_device
512  * \param[in] th                pointer to the transaction handler
513  *
514  * \retval                      0 for success
515  * \retval                      negative error number on failure
516  */
517 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
518                     struct thandle *th)
519 {
520         struct thandle_update           *tu = th->th_update;
521         struct dt_update_request        *dt_update;
522         int                              rc = 0;
523
524         if (tu == NULL)
525                 return rc;
526
527         /* Check whether there are updates related with this OSP */
528         dt_update = out_find_update(tu, dt);
529         if (dt_update == NULL)
530                 return rc;
531
532         if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
533                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
534                                        false);
535
536         return rc;
537 }
538
539 /**
540  * The OSP layer dt_device_operations::dt_trans_stop() interface
541  * to stop the transaction.
542  *
543  * If the transaction is a remote transaction, or the update handler
544  * is marked as 'tu_sent_after_local_trans', then related remote
545  * updates will be triggered here via osp_trans_trigger().
546  *
547  * For synchronous mode update or any failed update, the request
548  * will be destroyed explicitly when the osp_trans_stop().
549  *
550  * Please refer to osp_trans_create() for transaction type.
551  *
552  * \param[in] env               pointer to the thread context
553  * \param[in] dt                pointer to the OSP dt_device
554  * \param[in] th                pointer to the transaction handler
555  *
556  * \retval                      0 for success
557  * \retval                      negative error number on failure
558  */
559 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
560                    struct thandle *th)
561 {
562         struct thandle_update           *tu = th->th_update;
563         struct dt_update_request        *dt_update;
564         int                              rc = 0;
565         ENTRY;
566
567         LASSERT(tu != NULL);
568         LASSERT(tu != LP_POISON);
569
570         /* Check whether there are updates related with this OSP */
571         dt_update = out_find_update(tu, dt);
572         if (dt_update == NULL) {
573                 if (!is_only_remote_trans(th))
574                         RETURN(rc);
575
576                 GOTO(put, rc);
577         }
578
579         if (dt_update->dur_buf.ub_req == NULL ||
580             dt_update->dur_buf.ub_req->ourq_count == 0) {
581                 dt_update_request_destroy(dt_update);
582                 GOTO(put, rc);
583         }
584
585         if (is_only_remote_trans(th)) {
586                 if (th->th_result == 0) {
587                         struct osp_device *osp = dt2osp_dev(th->th_dev);
588                         struct client_obd *cli = &osp->opd_obd->u.cli;
589
590                         rc = obd_get_request_slot(cli);
591                         if (!osp->opd_imp_active || !osp->opd_imp_connected) {
592                                 if (rc == 0)
593                                         obd_put_request_slot(cli);
594
595                                 rc = -ENOTCONN;
596                         }
597
598                         if (rc != 0) {
599                                 dt_update_request_destroy(dt_update);
600                                 GOTO(put, rc);
601                         }
602
603                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
604                                                dt_update, th, true);
605                         if (rc != 0)
606                                 obd_put_request_slot(cli);
607                 } else {
608                         rc = th->th_result;
609                         dt_update_request_destroy(dt_update);
610                 }
611         } else {
612                 if (tu->tu_sent_after_local_trans)
613                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
614                                                dt_update, th, false);
615                 rc = dt_update->dur_rc;
616                 dt_update_request_destroy(dt_update);
617         }
618
619         GOTO(put, rc);
620
621 put:
622         thandle_put(th);
623         return rc;
624 }