Here I record the logs of exploring, changing of aiida_core code base. The post is organized in date.
The project entry point
I explore the code base of aiida_core, to decide where to kick off gsoc project. There are lots of code involved with tornado, and I divide them into following several categories.
- The code interact with
- The component that need asynchronous supported natually, e.g.
- coroutines are running at the event loop of runner object.
engine/transports.py could be the good starting point.
The transports is a component in aiida_core used for handle the connection with to the client computer. In principle, it may cost some time to open the link to the computer, therefore it should not be blocked. That is why asynchronous programming intervened.
TransportQueue object has its own loop, if not set, use the event loop of the main thread
IOLoop.current() . Then all the coroutines of transport tasks are running in this loop.
inspect.stack is now support nametuple to
inspect the function name. Therefore, can be improved for
ensure_future in process async test
Now I change process test in plum by launching a process throughly asynchronous
asyncio.ensure_future. The previous implementation is adding a callback
function in the loop and then launch the loop. Here with pytest asyncio, I
have the whole test function running in a loop by decorate the test with
@pytest.mark.asyncio. As a result, I cannot add callback in the current loop, while
have to launch the coroutine by
ensure_future. This is not wrong in the situation, but
this implementation is not compatible with the process test case in aiida_core which
add the function by
loop.add_callback and then running the loop.
So my question is which implementation is good? Can I use the same approach in aiida_core? Check the two types of implementation at here
My understanding is that ensure_future is register a coroutine to be run in the future.
ensure_future is the asynchronous counterpart of the
call_soon which for running a
synchronous function in event loop. And to run function in the event loop of other
call_soon_threadsafe and its asynchronous counterpart
Please check https://cheat.readthedocs.io/en/latest/python/asyncio.html for more information.
CalculationFuture object only get set when the calculation is finished.
There are two ways to get the result of
CalculationFuture set. One is by communicating over
RabbitMQ broadcast, the other is by polling and waiting for poll interval before check whether
is the result set.
_poll_calculation is a coroutine since it is counting and check the state of future,
it is asynchronous with process, they are running in the same event loop of runner.
When a function is running by using
exponential_backoff_retry, the function
is first change to a coroutine if is not. Then the function is retry until it
is successfully run. The
exponential_backoff_retry itself is a coroutine since
it should not block other steps of process, and it is
run in an event loop. Every time the function is retried,
adds a asynchronous counter to count the time to wait the function to be run.
The time of counter is exponentially increased. Therefore, it is named
When changing tornado to asyncio, the logger of asyncio should also be implement.
For reference, https://docs.python.org/3/library/asyncio-dev.html#logging
Just setting loggers in file
And this seems like the first thing should be done before refactoring from tornado to asyncio.
continue_process receive pid of process as input and running the process. The pid
is stored in some media of a persist process.
launch_process directly running
the incoming process, by initialize the process class to corresponding object.
Here, I can try to add a test for redefined
_continue function in aiida
raise gen.Return(result) in process_function
process_function which wrap the turn the function into
a FunctionProcess. If process is executed not in a daemon but in a local runner,
aiida is listen to the signal of interrupt and handle the signal with
function which raise a critical log and kill the running process elegantly. However,
here in the
kill_process, the result is returned by
raise gen.Return(result). I am
confused that this function is not a coroutine, why use such syntax? Can we directly return
The mechanism that allowed engine connecting to the remote machine is called transport,
and the transport tasks are the tasks performed in transport stage. The transport tasks
CalcJob process. First of all,
CalcJob overwrite the
run method of
Process class. It enter the UPLOAD state by
plumpy.Wait when run by
run method. Then after UPLOAD
state, it enter the SUBMIT and then UPDATE and then RETRIEVE state step by step.
The state transition in Waiting state of
CalcJob is done by
create_state methods which
are defined in function
retrieve. Everytime the process
got its change to run, it will check its current state and tries to move to next state.
Question: different between
submit functions of runner and launch module.
There are two similar
submit method both in
aiida/engine/launch module and
aiida/engine/runner. What’s the different? And when to use which?
Please check: https://aiida.readthedocs.io/projects/aiida-core/en/latest/working/workflows.html#submitting-sub-processes The runner submit is used in subworkflow submitting specifically. In work chain, the upper process should wait until the subprocess is surely finished, and the future to resolved, before continue.
Although I get the explanation of their difference in design, I still unable to find the
real implementation different between them. They both run with
up bound the number of jobs in the schedule queue
This is the requirement mentioned in the issue#88.
Even the user case is not very common, to extend the usage of the software is always seems
like not a bad thing. After look into the code base, I assumed this functionality should be
aiida/engine/processes/calcjobs/task.py::task_submit_job. Before actually
submit the uploaded job by
execmanager, inspect the number of jobs queued at the
scheduler of a specific user. Only submit the job when the number limitation is unreached.
analyse the code of circus part in aiida_core
One thing to sure, is from start to end, only one circus watcher is created and used and spawn many processes(system) for running processes(AiiDA).
Here are something I am not fully understanded.
- Are the daemon processes start by circus independent? Can they communicate with other processes? And how?
verdicommand launch aiida processes to daemon, how circus balance load them?
add new feature so that specific type node can be quickly spot in the graph
In order to find the initial configuration of a final most stable structure in my
field research, it is easy to use AiiDA and pick the initial configuration from
the provenance graph. However, sometimes when the workflow is very complex, it is
not very easy to recognize the desired nodes from provenance graph. To overcome
this, I simply add an option
verdi node graph generate to
colored the nodes with expected class and left other types of nodes decolorized.
The details can be found in https://github.com/aiidateam/aiida-core/pull/4081 . This PR also fix the origin node highlight feature.
Some tests are failed locally
Running the tests with the latest version (5e02e164b) of aiida_core, I got some tests not passed. Some of the are related to the asynchronous programming, therefore, necessarily to be fixed before head. The failed tests are:
=============================================== short test summary info ================================================ FAILED tests/cmdline/commands/test_daemon.py::TestVerdiDaemon::test_daemon_restart - AssertionError: SystemExit(<Exit... FAILED tests/cmdline/commands/test_daemon.py::TestVerdiDaemon::test_daemon_start - AssertionError: 'daemon-error-not-... FAILED tests/cmdline/commands/test_import.py::TestVerdiImport::test_import_old_url_archives - AssertionError: SystemE... FAILED tests/cmdline/commands/test_import.py::TestVerdiImport::test_import_url_and_local_archives - AssertionError: S... FAILED tests/sphinxext/test_workchain.py::test_workchain_build - assert 2 == 0 FAILED tests/sphinxext/test_workchain.py::test_broken_workchain_build - assert 'The broken workchain says hi!' in "\n... FAILED tests/transports/test_all_plugins.py::TestBasicFunctionality::test_is_open - tests.transports.test_all_plugins... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_chdir_to_empty_string - tests.transports... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_dir_copy - tests.transports.test_all_plu... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_dir_creation_deletion - tests.transports... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_dir_permissions_creation_modification - ... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_dir_reading_permissions - tests.transpor... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_isfile_isdir_to_empty_string - tests.tra... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_isfile_isdir_to_non_existing_string - te... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_listdir - tests.transports.test_all_plug... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_listdir_withattributes - tests.transport... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_makedirs - tests.transports.test_all_plu... FAILED tests/transports/test_all_plugins.py::TestDirectoryManipulation::test_rmtree - tests.transports.test_all_plugi... FAILED tests/transports/test_all_plugins.py::TestPutGetFile::test_put_and_get - tests.transports.test_all_plugins.run... FAILED tests/transports/test_all_plugins.py::TestPutGetFile::test_put_get_abs_path - tests.transports.test_all_plugin... FAILED tests/transports/test_all_plugins.py::TestPutGetFile::test_put_get_empty_string - tests.transports.test_all_pl... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_copy - tests.transports.test_all_plugins.run_for_al... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_get - tests.transports.test_all_plugins.run_for_all... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_put - tests.transports.test_all_plugins.run_for_all... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_put_and_get - tests.transports.test_all_plugins.run... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_put_and_get_overwrite - tests.transports.test_all_p... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_put_get_abs_path - tests.transports.test_all_plugin... FAILED tests/transports/test_all_plugins.py::TestPutGetTree::test_put_get_empty_string - tests.transports.test_all_pl... FAILED tests/transports/test_all_plugins.py::TestExecuteCommandWait::test_exec_pwd - tests.transports.test_all_plugin... FAILED tests/transports/test_all_plugins.py::TestExecuteCommandWait::test_exec_with_stdin_filelike - tests.transports... FAILED tests/transports/test_all_plugins.py::TestExecuteCommandWait::test_exec_with_stdin_string - tests.transports.t... FAILED tests/transports/test_all_plugins.py::TestExecuteCommandWait::test_exec_with_stdin_unicode - tests.transports.... FAILED tests/transports/test_all_plugins.py::TestExecuteCommandWait::test_exec_with_wrong_stdin - tests.transports.te... FAILED tests/transports/test_ssh.py::TestBasicConnection::test_auto_add_policy - paramiko.ssh_exception.NoValidConnec... FAILED tests/transports/test_ssh.py::TestBasicConnection::test_no_host_key - paramiko.ssh_exception.NoValidConnection... ========================= 35 failed, 1438 passed, 16 skipped, 33 warnings in 261.39s (0:04:21) =========================
InterruptableFuture is the class defined in file
unittest mock the presubmit of CalcJob
mock.patch of unittest used for?
Getting stuck in WorkChain part
The problem is when running nested WorkChain, the inner workchain is
run but not returned to set Context variable. After several hours debugging, I am sure
the inner workchain is reaching the
Finished state, but, then the event loop
is block by something. Can not figure out why.
Only one event loop bond to the thread
Maybe the same reason which caused the above problem. When outer most process is running, a process is started and running, then no other event loop is allowed to be run.
To clarify my guess, the following test case passed:
def test_process_nested(self): """ Run multiple and nested processes to make sure the process stack is always correct with process in new event loop. """ expect_true =  class StackTest(plumpy.Process): def run(self): # TODO: unexpected behaviour here # if assert error happend here not raise # it will be handled by try except clause in process # is there better way to handle this? expect_true.append(self == Process.current()) class ParentProcess(plumpy.Process): def run(self): expect_true.append(self == Process.current()) StackTest(loop=events.new_event_loop()).execute() ParentProcess().execute()
As we can expected, it is not working for asyncio.
For now, I find
nest_asyncio patch package can be used to tackle this
problem. But not sure is it the ultimate way?
Why process_class build from FunctionProcess pass runner
Ha! My first code review
Thanks for the chance provided by tutor Leopold,
this is my first code review in
In this PR, new feature yaml config from online url is added.
Details of single event loop problem
To recap the problem I encountered which blocks me to go further. Different from
tornado<5.0 the event loop of asyncio (
tornado>=5.0 also use
asyncio event loop)
did not support running nest event loop which will raise a
RuntimeError: This event loop is already running in python
Please reference of https://stackoverflow.com/questions/46827007/runtimeerror-this-event-loop-is-already-running-in-python for more details of this problem itself.
execute class function of
Process, an asynchronous call of
is scheduled in the non-async code here and wait for the result until it is done.
There seems not way to schedule more asynchronous calls here and wait, since the main
thread is block by the last
As mentioned above,
plumpy uses this feature for nesting and execute process.
This pattern also occurs in simple workflows and workchains of
where the calcfunction
mul are executed inside the
@calcfunction def add(data_a, data_b): return data_a + data_b @calcfunction def mul(data_a, data_b): return data_a * data_b @workfunction def add_mul_wf(data_a, data_b, data_c): return mul(add(data_a, data_b), data_c) result, node = add_mul_wf.run_get_node(orm.Int(3), orm.Int(4), orm.Int(5)) self.assertEqual(result, (3 + 4) * 5) self.assertIsInstance(node, orm.WorkFunctionNode)
nest-asyncio can be used to overcome this, but I am not sure about it:
- Is there good way to do this without using
- The nested process should be running in the same event loop of its parent process or a new event loop?
As for the second question, I did some experiment, and here are some ideas
and my understanding of the code base. In the
add_mul_wf above, when a new
FunctionProcess is created and run which
run_get_node, a new runner is
created from scratch with new_event_loop by
manager.create_runner. It shows that in old code nested
process is running in a different event loop. With the help of
the processes can also run in a single event loop say event loop got by
only thing should notice is when all tasks are done in the loop, it will be closed automatically.
If I am missing something please point it out.
Replace tornado with asyncio in rmq module
after that in engine test cases only:L
FAILED tests/engine/test_manager.py::TestJobManager::test_request_job_info_update - TypeError: coroutines cannot be u... FAILED tests/engine/test_runners.py::TestWorkchain::test_call_on_calculation_finish - AttributeError: '_UnixSelectorE... FAILED tests/engine/test_utils.py::TestInterruptable::test_interrupt - TypeError: a callable object was expected by c...
communicator hang up when disconnected
The problem can be reproduce with aiida_core https://github.com/unkcpz/aiida_core/tree/gsoc-rmq and plumpy https://github.com/unkcpz/plumpy/tree/gsoc-asyncio (and asyncio kiwipy). By running test case
$ pytest tests/engine/test_rmq.py::TestProcessControl::test_kill -sv
And also same problems for
TestProcessControl::test_pause .. But with process finished without exception for example
tearDown will not timeout.
Now, I also test with following dependent matrix:
aiida_core https://github.com/unkcpz/aiida_core/tree/gsoc-rmq and plumpy https://github.com/unkcpz/plumpy/tree/gsoc-asyncio and
kiwipy<0.6.0. And there is no exception no hang up.
it is more certain that the new version kiwipy may have some incompatible features
call_at only receive callback
How to add coroutine to
just by pass the coroutine to
asyncio.create_task (ensure_future for py<3.7)
async def coro(): await asyncio.sleep(1.) delay = 5. call_at(delay, asyncio.ensure_future, coro())
Tutorial test with new aiida_core
Now the aiida_core is totally migrated to asyncio. Tests beyond unittest is necessary.
- Process status not updated when process finished. (Only CalcJob has this issue)
- daemon stop hang up. But after timeout, it is stopped.