Examples

Some demonstrations of how to use some general modules and its APIs. It demonstrates 4 points and 2 features:

  • 4 Points
    • multirunnable.SimpleExecutor

    • multirunnable.SimplePool

    • Executor or Pool with multirunnable.persistence.file

    • Executor or Pool with multirunnable.persistence.database

  • 2 Features
    • Lock with Python decorators

    • Retry mechanism

4 Points

Demonstrating the core module and APIs of multirunnable.

Simple Executor

An example shows how to use multirunnable.SimpleExecutor:

from multirunnable import RunningMode, SimpleExecutor
import multirunnable as mr

def target_function(self, *args, **kwargs) -> str:
    multirunnable.sleep(3)
    print("This is target function args: ", args)
    print("This is target function kwargs: ", kwargs)
    return "Hello, Return"

_executor_number = 3
# # # # Initial Executor object
_executor = SimpleExecutor(mode=RunningMode.Parallel, executors=_executor_number)
# _executor = SimpleExecutor(mode=RunningMode.Concurrent, executors=_executor_number)
# _executor = SimpleExecutor(mode=RunningMode.GreenThread, executors=_executor_number)

# # # # Running the Executor
_executor.run(function=target_function, args=("index_1", "index_2.2"))

# # # # Get result
_result = p.get_result()
print("Result: ", _result)

Simple Pool

Using multirunnable.SimplePool:

from multirunnable import RunningMode, SimplePool, sleep, async_sleep
import random

def target_function(self, *args, **kwargs) -> str:
    sleep(3)
    print("This is target function args: ", args)
    print("This is target function kwargs: ", kwargs)
    return "Hello, Return"

# # # # Initial Pool object
_pool = SimplePool(mode=RunningMode.Parallel, pool_size=self.__Pool_Size, tasks_size=self.__Task_Size)
# _pool = SimplePool(mode=RunningMode.Concurrent, pool_size=self.__Pool_Size, tasks_size=self.__Task_Size)
# _pool = SimplePool(mode=RunningMode.GreenThread, pool_size=self.__Pool_Size, tasks_size=self.__Task_Size)

_result = None
with _pool as p:
    # # # # Running Pool
    # p.apply(function=target_function, kwargs={"index": f"test_{random.randrange(10,20)}"})
    p.async_apply(function=target_function, kwargs={"index": f"test_{random.randrange(10,20)}"})
    p.map(function=target_function, args_iter=("index_1", "index_2.2", "index_3"))
    # p.map_by_args(function=target_function, args_iter=[("index_1", "index_2.2"), ("index_3",), (1, 2, 3)])

    # # # # Get result
    # # # # You will get the result of 'map' only.
    _result = p.get_result()

print("Result: ", _result)

Persistence - File

About persistence as file, it could use FAO (File Access Object) with object BaseFao directly:

fao = BaseFao(strategy=SavingStrategy.ALL_THREADS_ONE_FILE)
fao.save_as_csv(mode="a+", file="testing.csv", data=_data)
fao.save_as_excel(mode="a+", file="testing.xlsx", data=_data)
fao.save_as_json(mode="a+", file="testing.json", data=_data)

Consider about remove the template implementations to let subclass to implement it like database subpackage. It will deprecate this at version 0.18.0 and remove this at version 0.19.0 if it ensures the decision.

Persistence - Database

It has 3 sections in subpackage .multirunnable.persistence.database.

  • Connection Factory

    module: multirunnable.persistence.database.strategy

    • Single Connection

    • Connection Pool

  • Database Operators

    module: multirunnable.persistence.database.operator

For connection factory section, literally, its responsibility is generating connection or connection pool instance(s). For another one — operator, it responses of doing any operators with database via the connection instance which be generated from connection factory.

About implementing customized persistence objects with database, it should inherit some classes if it needs:

  • Connection Factory
    • Single Connection:

      object: BaseSingleConnection

    • Connection Pool:

      object: BaseConnectionPool

  • Database Operators:

    object: DatabaseOperator

It only select one of them of Connection Factory. Below are some demonstrations of how to implement them (demonstrating with MySQL).

For BaseSingleConnection object:

from mysql.connector.connection import MySQLConnection
from mysql.connector.cursor import MySQLCursor
import mysql.connector


class MySQLSingleConnection(BaseSingleConnection):

    @property
    def connection(self) -> MySQLConnection:
        return self._database_connection


    def connect_database(self, **kwargs) -> MySQLConnection:
        _connection = mysql.connector.connect(**kwargs)
        return _connection


    def commit(self) -> None:
        self.connection.commit()


    def close(self) -> None:
        if self.connection is not None and self.connection.is_connected():
            self.connection.close()

For BaseConnectionPool object:

from mysql.connector.connection import MySQLConnection
from mysql.connector.pooling import MySQLConnectionPool, PooledMySQLConnection
from mysql.connector.errors import PoolError
from mysql.connector.cursor import MySQLCursor
import mysql.connector


class MySQLDriverConnectionPool(BaseConnectionPool):

    def connect_database(self, **kwargs) -> MySQLConnectionPool:
        connection_pool = MySQLConnectionPool(**kwargs)
        return connection_pool


    def get_one_connection(self, pool_name: str = "", **kwargs) -> PooledMySQLConnection:
        while True:
            try:
                __connection = get_connection_pool(pool_name=pool_name).get_connection()
                logging.info(f"Get a valid connection: {__connection}")
                return __connection
            except PoolError as e:
                logging.error(f"Connection Pool: {get_connection_pool(pool_name=pool_name)} ")
                logging.error(f"Will sleep for 5 seconds to wait for connection is available.")
                time.sleep(5)
            except AttributeError as ae:
                raise ConnectionError(f"Cannot get the one connection instance from connection pool because it doesn't exist the connection pool with the name '{pool_name}'.")


    def commit(self) -> None:
        self.connection.commit()


    def close_pool(self, pool_name: str) -> None:
        get_connection_pool(pool_name=pool_name).close()


    def close(self) -> None:
        if self.connection is not None and self.connection.is_connected():
            self.connection.close()

For DatabaseOperator object:

class MySQLOperator(DatabaseOperator):

    def __init__(self, conn_strategy: BaseDatabaseConnection, db_config: Dict = {}):
        super().__init__(conn_strategy=conn_strategy, db_config=db_config)


    def initial_cursor(self, connection: Union[MySQLConnection, PooledMySQLConnection]) -> MySQLCursor:
        return connection.cursor(buffered=True)


    @property
    def column_names(self) -> MySQLCursor:
        return self._cursor.column_names


    @property
    def row_count(self) -> MySQLCursor:
        return self._cursor.rowcount


    def next(self) -> MySQLCursor:
        return self._cursor.next()


    def execute(self, operator: Any, params: Tuple = None, multi: bool = False) -> MySQLCursor:
        return self._cursor.execute(operation=operator, params=params, multi=multi)


    def execute_many(self, operator: Any, seq_params=None) -> MySQLCursor:
        return self._cursor.executemany(operation=operator, seq_params=seq_params)


    def fetch(self) -> MySQLCursor:
        return self._cursor.fetch()


    def fetch_one(self) -> MySQLCursor:
        return self._cursor.fetchone()


    def fetch_many(self, size: int = None) -> MySQLCursor:
        return self._cursor.fetchmany(size=size)


    def fetch_all(self) -> MySQLCursor:
        return self._cursor.fetchall()


    def reset(self) -> None:
        self._cursor.reset()

Here is an example how to use them:

_database_config = {
    "host": "127.0.0.1",
    "port": "3306",
    "user": "root",
    "password": "password",
    "database": "test"
}

# # Using single connection strategy
_db_opts = MySQLOperator(MySQLSingleConnection(**_database_config))
# # Using connection pool strategy
# _db_opts = MySQLOperator(MySQLDriverConnectionPool(**_database_config))

_db_opts.execute('SELECT col_1, col_2 FROM test.test_table LIMIT 10')
_data = _db_opts.fetch_all()

2 Features

Demonstrating some features with Python syntactic sugar of multirunnable.

Lock with Python decorators

An example show how to decorate Lock feature to a function.

from multirunnable.api import RunWith

@RunWith.Lock
def lock_function(self):
    print("This is testing process with Lock and sleep for 3 seconds.")
    sleep(3)
    return "Return_Value"

Retry mechanism

An example show how to use feature ‘retry’.

from multirunnable.api import retry
import multirunnable

class ExampleTargetFunction:

    def target_function(self, *args, **kwargs) -> str:
        multirunnable.sleep(3)
        return "Return_Value."


    @retry
    def target_fail_function(self, *args, **kwargs) -> None:
        print("It will raise exception after 3 seconds ...")
        multirunnable.sleep(3)
        raise Exception("Test for error")


    @target_fail_function.initialization
    def initial(self):
        print("This is testing initialization")


    @target_fail_function.done_handling
    def done(self, result):
        print("This is testing done process")
        print("Get something result: ", result)


    @target_fail_function.final_handling
    def final(self):
        print("This is final process")


    @target_fail_function.error_handling
    def error(self, error):
        print("This is error process")
        print("Get something error: ", error)