Python ETL/ELT library powered by Apache Spark & other open-source tools.
Provide unified classes to extract data from (E) & load data to (L) various stores.
Provides Spark DataFrame API for performing transformations (T) in terms of ETL.
Provide direct assess to database, allowing to execute SQL queries, as well as DDL, DML, and call functions/procedures. This can be used for building up ELT pipelines.
Support different read strategies for incremental and batch data fetching.
Provide hooks & plugins mechanism for altering behavior of internal classes.
onETL is not a Spark replacement. It just provides additional functionality that Spark does not have, and improves UX for end users.
onETL is not a framework, as it does not have requirements to project structure, naming, the way of running ETL/ELT processes, configuration, etc. All of that should be implemented in some other tool.
onETL is deliberately developed without any integration with scheduling software like Apache Airflow. All integrations should be implemented as separated tools.
Only batch operations, no streaming. For streaming prefer Apache Flink.
Python 3.7 - 3.13
PySpark 2.3.x - 3.5.x (depends on used connector)
Java 8+ (required by Spark, see below)
Kerberos libs & GCC (required by Hive, HDFS and SparkHDFS connectors)
FileDownloader, FileUploader, FileMover and related classes, like file filters & limits
FileDFReader, FileDFWriter and related classes, like file formats
Read Strategies & HWM classes
Plugins support
It can be installed via:
Warning
This method does NOT include any connections.
This method is recommended for use in third-party libraries which require for onetl to be installed,
but do not use its connection classes.
With DB and FileDF connections
All DB connection classes (Clickhouse, Greenplum, Hive and others)
and all FileDF connection classes (SparkHDFS, SparkLocalFS, SparkS3)
require Spark to be installed.
Firstly, you should install JDK. The exact installation instruction depends on your OS, here are some examples:
Also you should pass kerberos to extras to install required Python packages:
pip install "onetl[kerberos]"
To install all connectors and dependencies, you can pass all into extras:
pip install "onetl[all]"# this is just the same as
pip install "onetl[spark,files,kerberos]"
Warning
This method consumes a lot of disk space, and requires for Java & Kerberos libraries to be installed into your OS.
Read data from MSSQL, transform & write to Hive.
# install onETL and PySpark
pip install "onetl[spark]"
# Import pyspark to initialize the SparkSessionfrompyspark.sqlimportSparkSession# import function to setup onETL loggingfromonetl.logimportsetup_logging# Import required connectionsfromonetl.connectionimportMSSQL, Hive# Import onETL classes to read & write datafromonetl.dbimportDBReader, DBWriter# change logging level to INFO, and set up default logging format and handlersetup_logging()
# Initialize new SparkSession with MSSQL driver loadedmaven_packages=MSSQL.get_packages()
spark= (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.enableHiveSupport() # for Hive
.getOrCreate()
)
# Initialize MSSQL connection and check if database is accessiblemssql=MSSQL(
host="mssqldb.demo.com",
user="onetl",
password="onetl",
database="Telecom",
spark=spark,
# These options are passed to MSSQL JDBC Driver:extra={"applicationIntent": "ReadOnly"},
).check()
# >>> INFO:|MSSQL| Connection is available# Initialize DBReaderreader=DBReader(
connection=mssql,
source="dbo.demo_table",
columns=["on", "etl"],
# Set some MSSQL read options:options=MSSQL.ReadOptions(fetchsize=10000),
)
# checks that there is data in the table, otherwise raises exceptionreader.raise_if_no_data()
# Read data to DataFramedf=reader.run()
df.printSchema()
# root# |-- id: integer (nullable = true)# |-- phone_number: string (nullable = true)# |-- region: string (nullable = true)# |-- birth_date: date (nullable = true)# |-- registered_at: timestamp (nullable = true)# |-- account_balance: double (nullable = true)# Apply any PySpark transformationsfrompyspark.sql.functionsimportlitdf_to_write=df.withColumn("engine", lit("onetl"))
df_to_write.printSchema()
# root# |-- id: integer (nullable = true)# |-- phone_number: string (nullable = true)# |-- region: string (nullable = true)# |-- birth_date: date (nullable = true)# |-- registered_at: timestamp (nullable = true)# |-- account_balance: double (nullable = true)# |-- engine: string (nullable = false)# Initialize Hive connectionhive=Hive(cluster="rnd-dwh", spark=spark)
# Initialize DBWriterdb_writer=DBWriter(
connection=hive,
target="dl_sb.demo_table",
# Set some Hive write options:options=Hive.WriteOptions(if_exists="replace_entire_table"),
)
# Write data from DataFrame to Hivedb_writer.run(df_to_write)
# Success!
Download files from SFTP & upload them to HDFS.
# install onETL with SFTP and HDFS clients, and Kerberos support
pip install "onetl[hdfs,sftp,kerberos]"
# import function to setup onETL loggingfromonetl.logimportsetup_logging# Import required connectionsfromonetl.connectionimportSFTP, HDFS# Import onETL classes to download & upload filesfromonetl.fileimportFileDownloader, FileUploader# import filter & limit classesfromonetl.file.filterimportGlob, ExcludeDirfromonetl.file.limitimportMaxFilesCount# change logging level to INFO, and set up default logging format and handlersetup_logging()
# Initialize SFTP connection and check itsftp=SFTP(
host="sftp.test.com",
user="someuser",
password="somepassword",
).check()
# >>> INFO:|SFTP| Connection is available# Initialize downloaderfile_downloader=FileDownloader(
connection=sftp,
source_path="/remote/tests/Report", # path on SFTPlocal_path="/local/onetl/Report", # local fs pathfilters=[
# download only files matching the globGlob("*.csv"),
# exclude files from this directoryExcludeDir("/remote/tests/Report/exclude_dir/"),
],
limits=[
# download max 1000 files per runMaxFilesCount(1000),
],
options=FileDownloader.Options(
# delete files from SFTP after successful downloaddelete_source=True,
# mark file as failed if it already exist in local_pathif_exists="error",
),
)
# Download files to local filesystemdownload_result=downloader.run()
# Method run returns a DownloadResult object,# which contains collection of downloaded files, divided to 4 categoriesdownload_result# DownloadResult(# successful=[# LocalPath('/local/onetl/Report/file_1.json'),# LocalPath('/local/onetl/Report/file_2.json'),# ],# failed=[FailedRemoteFile('/remote/onetl/Report/file_3.json')],# ignored=[RemoteFile('/remote/onetl/Report/file_4.json')],# missing=[],# )# Raise exception if there are failed files, or there were no files in the remote filesystemdownload_result.raise_if_failed() ordownload_result.raise_if_empty()
# Do any kind of magic with files: rename files, remove header for csv files, ...renamed_files=my_rename_function(download_result.success)
# function removed "_" from file names# [# LocalPath('/home/onetl/Report/file1.json'),# LocalPath('/home/onetl/Report/file2.json'),# ]# Initialize HDFS connectionhdfs=HDFS(
host="my.name.node",
user="someuser",
password="somepassword", # or keytab
)
# Initialize uploaderfile_uploader=FileUploader(
connection=hdfs,
target_path="/user/onetl/Report/", # hdfs path
)
# Upload files from local fs to HDFSupload_result=file_uploader.run(renamed_files)
# Method run returns a UploadResult object,# which contains collection of uploaded files, divided to 4 categoriesupload_result# UploadResult(# successful=[RemoteFile('/user/onetl/Report/file1.json')],# failed=[FailedLocalFile('/local/onetl/Report/file2.json')],# ignored=[],# missing=[],# )# Raise exception if there are failed files, or there were no files in the local filesystem, or some input file is missingupload_result.raise_if_failed() orupload_result.raise_if_empty() orupload_result.raise_if_missing()
# Success!
Read files directly from S3 path, convert them to dataframe, transform it and then write to a database.
# install onETL and PySpark
pip install "onetl[spark]"
# Import pyspark to initialize the SparkSessionfrompyspark.sqlimportSparkSession# import function to setup onETL loggingfromonetl.logimportsetup_logging# Import required connectionsfromonetl.connectionimportPostgres, SparkS3# Import onETL classes to read filesfromonetl.fileimportFileDFReaderfromonetl.file.formatimportCSV# Import onETL classes to write datafromonetl.dbimportDBWriter# change logging level to INFO, and set up default logging format and handlersetup_logging()
# Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loadedmaven_packages=SparkS3.get_packages(spark_version="3.5.5") +Postgres.get_packages()
exclude_packages=SparkS3.get_exclude_packages()
spark= (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
# Initialize S3 connection and check itspark_s3=SparkS3(
host="s3.test.com",
protocol="https",
bucket="my-bucket",
access_key="somekey",
secret_key="somesecret",
# Access bucket as s3.test.com/my-bucketextra={"path.style.access": True},
spark=spark,
).check()
# >>> INFO:|SparkS3| Connection is available# Describe file format and parsing optionscsv=CSV(
delimiter=";",
header=True,
encoding="utf-8",
)
# Describe DataFrame schema of filesfrompyspark.sql.typesimport (
DateType,
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
df_schema=StructType(
[
StructField("id", IntegerType()),
StructField("phone_number", StringType()),
StructField("region", StringType()),
StructField("birth_date", DateType()),
StructField("registered_at", TimestampType()),
StructField("account_balance", DoubleType()),
],
)
# Initialize file df readerreader=FileDFReader(
connection=spark_s3,
source_path="/remote/tests/Report", # path on S3 there *.csv files are locatedformat=csv, # file format with specific parsing optionsdf_schema=df_schema, # columns & types
)
# Read files directly from S3 as Spark DataFramedf=reader.run()
# Check that DataFrame schema is same as expecteddf.printSchema()
# root# |-- id: integer (nullable = true)# |-- phone_number: string (nullable = true)# |-- region: string (nullable = true)# |-- birth_date: date (nullable = true)# |-- registered_at: timestamp (nullable = true)# |-- account_balance: double (nullable = true)# Apply any PySpark transformationsfrompyspark.sql.functionsimportlitdf_to_write=df.withColumn("engine", lit("onetl"))
df_to_write.printSchema()
# root# |-- id: integer (nullable = true)# |-- phone_number: string (nullable = true)# |-- region: string (nullable = true)# |-- birth_date: date (nullable = true)# |-- registered_at: timestamp (nullable = true)# |-- account_balance: double (nullable = true)# |-- engine: string (nullable = false)# Initialize Postgres connectionpostgres=Postgres(
host="192.169.11.23",
user="onetl",
password="somepassword",
database="mydb",
spark=spark,
)
# Initialize DBWriterdb_writer=DBWriter(
connection=postgres,
# write to specific tabletarget="public.my_table",
# with some writing optionsoptions=Postgres.WriteOptions(if_exists="append"),
)
# Write DataFrame to Postgres tabledb_writer.run(df_to_write)
# Success!