Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>I created a behaviour based on gen_server which I named gen_select. Using it you write a callback module using the module attribute <code>-behaviour(gen_select)</code>. In your init/1 callback you open an ets or dets file and define a match specification and a limit. The process will chunk through the table calling your <code>handle_record/2</code> callback for each record until the end of the file. I found this a handy paradigm for some "big data" sort of work I've been doing. You could use it on the underlying ets table of your mnesia table, if that's appropriate, or modify it to use mnesia:select/4.</p> <pre><code>%%% gen_select.erl %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% @doc This module implements a behaviour pattern where a potentially %%% large number of records are read from an {@link //stdlib/ets. ets} %%% or {@link //stdlib/dets. dets} table. This is used in an application %%% to have supervised workers mapping over all the records in a table. %%% The user will call `gen_select:start_link/3', from a supervisor, to %%% create a process which will iterate over the selected records of a %%% table. The `init/1' callback should open the table to %%% be read and construct a match specification to be used to select %%% records from the table. It should return a tuple of the form: %%% ``` %%% {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore %%% TableType :: ets | dets %%% Table :: ets:tid() | atom() % when Type=ets %%% Table :: dets:tab_name() % when Type=dets %%% MatchSpec :: match_spec() % see ets:select/2 %%% Limit :: integer() % see ets:select/3 %%% State :: term() %%% Reason :: term() %%% ''' %%% After initialization {@link //stdlib/ets:select/3. ets:select/3} %%% or {@link //stdlib/dets:select/3. dets:select/3} will be called %%% using the `match_spec()' and `Limit' returned by `init/`'. The %%% callback function `handle_record/2' will then be called for each %%% record returned then `select/1' will be called to get more records. %%% This is repeated until the end of the table is reached when the %%% callback `terminate/2' is called with `Reason=eof'.. %%% -module(gen_select). -author('vance@wavenet.lk'). %% export the gen_select API -export([start_link/3]). %% export the callbacks needed for a system process -export([system_continue/3, system_terminate/4, system_code_change/4]). -export([format_status/2]). %% exports used internally -export([init_it/6]). %% define the callback exports of a module behaving as gen_select -type state() :: term(). -callback init(Args :: term()) -&gt; {ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(), MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()} | {stop, Reason :: term()} | ignore. -callback handle_record(Record :: tuple(), State :: state()) -&gt; {next_record, NewState :: state()} | {stop, Reason :: term(), NewState :: state()}. -callback terminate(Reason :: eof | term(), State :: state()) -&gt; any(). -import(error_logger, [format/2]). %%---------------------------------------------------------------------- %% gen_select API %%---------------------------------------------------------------------- -spec start_link(Mod :: atom(), Args :: term(), Options :: gen:options()) -&gt; gen:start_ret(). %% @doc Creates a {@module} process as part of a supervision tree. %% start_link(Mod, Args, Options) -&gt; gen:start(?MODULE, link, Mod, Args, Options). %%---------------------------------------------------------------------- %% internal exports %%---------------------------------------------------------------------- -spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(), CallBackMod :: atom(), Args :: term(), Options :: gen:options()) -&gt; no_return(). %% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize %% the process. %% Copied from //stdlib/gen_server:init_it/6. %% @hidden init_it(Starter, Parent, Pid, CallBackMod, Args, Options) -&gt; Debug = debug_options(Pid, Options), case catch CallBackMod:init(Args) of {ok, TableMod, Table, MatchSpec, Limit, State} -&gt; proc_lib:init_ack(Starter, {ok, self()}), case catch ets:select(Table, MatchSpec, Limit) of {Matches, Cont} when is_list(Matches) -&gt; loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches); '$end_of_table' -&gt; proc_lib:init_ack(Starter, {error, eof}), exit(eof); {error, Reason} -&gt; proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); {'EXIT', Reason} -&gt; proc_lib:init_ack(Starter, {error, Reason}), exit(Reason) end; {stop, Reason} -&gt; proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -&gt; proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -&gt; proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -&gt; Error = {bad_return_value, Else}, proc_lib:init_ack(Starter, {error, Error}), exit(Error) end. %%---------------------------------------------------------------------- %% system process callbacks %%---------------------------------------------------------------------- -type misc() :: [CallBackMod :: atom() | [State :: state() | [TableMod :: atom() | [Cont :: term() | [Matches :: [tuple()] | []]]]]]. -spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()], Misc :: misc()) -&gt; no_return(). %% @doc Called by {@link //sys:handle_system_msg/6} to continue. %% @private system_continue(Parent, Debug, [CallBackMod, State, TableMod, Cont, Matches]) -&gt; loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches). -spec system_terminate(Reason :: term(), Parent :: pid(), Debug :: [gen:dbg_opt()], Misc :: misc()) -&gt; no_return(). %% @doc Called by {@link //sys:handle_system_msg/6} to terminate. %% @private system_terminate(Reason, _Parent, Debug, [CallBackMod, State, _TableMod, _Cont, _Matches]) -&gt; terminate(Reason, CallBackMod, Debug, State). -spec system_code_change(Misc :: misc(), Module :: atom(), OldVsn :: undefined | term(), Extra :: term()) -&gt; {ok, NewMisc :: misc()}. %% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'. %% @private system_code_change([CallBackMod, State, TableMod, Cont, Matches], _Module, OldVsn, Extra) -&gt; case catch CallBackMod:code_change(OldVsn, State, Extra) of {ok, NewState} -&gt; {ok, [CallBackMod, NewState, TableMod, Cont, Matches]}; Other -&gt; Other end. -type pdict() :: [{Key :: term(), Value :: term()}]. -type status_data() :: [PDict :: pdict() | [SysState :: term() | [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]]. -spec format_status(Opt :: normal | terminate, StatusData :: status_data()) -&gt; [tuple()]. %% @doc Called by {@link //sys:get_status/1} to print state. %% @private format_status(Opt, [PDict, SysState, Parent, Debug, [CallBackMod, State, _TableMod, _Cont, _Matches]]) -&gt; Header = gen:format_status_header("Status for table reader", self()), Log = sys:get_debug(log, Debug, []), DefaultStatus = [{data, [{"State", State}]}], Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of true -&gt; case catch CallBackMod:format_status(Opt, [PDict, State]) of {'EXIT', _} -&gt; DefaultStatus; StatusList when is_list(StatusList) -&gt; StatusList; Else -&gt; [Else] end; _ -&gt; DefaultStatus end, [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}]} | Specfic]. %%---------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------- -spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()], State :: state(), TableMod :: atom(), Cont :: term(), Matches :: [tuple()]) -&gt; no_return(). %% @doc Main loop. %% Copied from //stdlib/gen_server:loop1/6. %% @hidden loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) -&gt; receive {system, From, Req} -&gt; sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, [CallBackMod, State, TableMod, Cont, Matches]); {'EXIT', Parent, Reason} -&gt; terminate(Reason, CallBackMod, Debug, State); Msg -&gt; sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg}) after 0 -&gt; loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) end. -spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()], State :: state(), TableMod :: atom(), Cont :: term(), Matches :: [tuple()]) -&gt; no_return(). %% @doc Run the `select/1' function. %% @hidden loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) -&gt; case catch CallBackMod:handle_record(H, State) of {next_record, NewState} -&gt; loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T); {stop, Reason, NewState} -&gt; terminate(Reason, CallBackMod, Debug, NewState); {'EXIT', Reason} -&gt; terminate(Reason, CallBackMod, Debug, State) end; loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) -&gt; case catch TableMod:select(Cont) of {Matches, NewCont} when is_list(Matches) -&gt; sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}), loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches); '$end_of_table' -&gt; terminate(eof, CallBackMod, Debug, State); {error, Reason} -&gt; terminate(Reason, CallBackMod, Debug, State); {'EXIT', Reason} -&gt; terminate(Reason, CallBackMod, Debug, State) end. -spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()], State :: state()) -&gt; no_return(). %% @doc Terminate the {@module} process. %% Copied from //stdlib/gen_server:terminate/6. %% @hidden terminate(Reason, CallBackMod, Debug, State) -&gt; case catch CallBackMod:terminate(Reason, State) of {'EXIT', R} -&gt; error_info(R, State, Debug), exit(R); _ -&gt; case Reason of normal -&gt; exit(normal); shutdown -&gt; exit(shutdown); {shutdown, _} = Shutdown -&gt; exit(Shutdown); _ -&gt; FmtState = case erlang:function_exported(CallBackMod, format_status, 2) of true -&gt; case catch CallBackMod:format_status(terminate, [get(), State]) of {'EXIT', _} -&gt; State; Else -&gt; Else end; _ -&gt; State end, error_info(Reason, FmtState, Debug), exit(Reason) end end. -spec error_info(Reason :: term(), State :: state(), Debug :: [gen:dbg_opt()]) -&gt; ok. %% @doc Print error log message. %% Copied from //stdlib/gen_server:error_info/5. %% @hidden error_info(Reason, State, Debug) -&gt; Reason1 = case Reason of {undef, [{M, F, A, L} | MFAs]} -&gt; case code:is_loaded(M) of false -&gt; {'module could not be loaded', [{M, F, A, L} | MFAs]}; _ -&gt; case erlang:function_exported(M, F, length(A)) of true -&gt; Reason; false -&gt; {'function not exported', [{M, F, A, L} | MFAs]} end end; _ -&gt; Reason end, format("** Table reader ~p terminating \n" "** When Server state == ~p~n" "** Reason for termination == ~n** ~p~n", [self(), State, Reason1]), sys:print_log(Debug), ok. %% Copied from //stdlib/gen_server:opt/2 opt(Op, [{Op, Value} | _]) -&gt; {ok, Value}; opt(Op, [_ | Options]) -&gt; opt(Op, Options); opt(_, []) -&gt; false. %% Copied from //stdlib/gen_server:debug_options/2 debug_options(Name, Opts) -&gt; case opt(debug, Opts) of {ok, Options} -&gt; dbg_options(Name, Options); _ -&gt; dbg_options(Name, []) end. %% Copied from //stdlib/gen_server:dbg_options/2 dbg_options(Name, []) -&gt; Opts = case init:get_argument(generic_debug) of error -&gt; []; _ -&gt; [log, statistics] end, dbg_opts(Name, Opts); dbg_options(Name, Opts) -&gt; dbg_opts(Name, Opts). %% Copied from //stdlib/gen_server:dbg_opts/2 dbg_opts(Name, Opts) -&gt; case catch sys:debug_options(Opts) of {'EXIT',_} -&gt; format("~p: ignoring erroneous debug options - ~p~n", [Name, Opts]), []; Dbg -&gt; Dbg end. -spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -&gt; ok. %% @doc Called by {@link //sys:handle_debug/4} to print trace events. print_event(Dev, {in, Msg}, Pid) -&gt; io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]); print_event(Dev, {read, Matches}, Pid) -&gt; io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]). </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload