Skip to content

Scheduler, AKA Dagster

About

The following is a description of the steps and requirements for building and deploying the docker based workflow implemented in dagster.

Overview

The image following provides a broad overview of the elements that are loaded in to the Docker orchestration environment. This is a very basic view and doesn't present any scaling or fail over elements.

The key elements are:

  • sources to configuration to load into the Gleaner and Nabu tools, and push to the triplestore. These are now stored in a s3 location
    • gleaner configuration. a list of sources to load. (NOTE: This is also a docker config that needs to be updated to match to make things work)
    • tenant configuration. a list communities, and which sources they load
    • nabu configuration
  • The Dagster set which loads three containers to support workflow operations
  • The Gleaner Architecture images which loads three or more containers to support
    • s3 object storage
    • graph database (triplestore)
    • headless chrome for page rendering to support dynamically inserted JSON-LD
    • any other support packages like text, semantic or spatial indexes

WORKFLOWS

There are three workflows * ingest works to load sources * tasks weekly task * custom - ecrr - loads Earthcube Resource Registry

--- title: Dagster Stack --- flowchart LR subgraph docker[docker managed by portainer] subgraph Stacks subgraph dagster-compose-stack dagit-service dagster-service postgres-service end subgraph ingest-compose-stack ingest-service tasks-service end subgraph ec-compose-stack ecrr-service end end config subgraph Volumes dagster-postgres end end
--- title: Dagster Stack --- flowchart LR subgraph DockerCompose[Docker Compose Stacks] maincompose[dagster/implents/deployment/compose_project.yaml] project_overrides[dagster/implnets/deployment/compose_project_eco_override.yaml] end subgraph Config subgraph s3 gleanconfig[gleanerconfig.yaml] tenant[tenant.yaml] nabu[nabuconfig.yaml] end subgraph Dagster/Config workflow[ eco-wf ] container-config-gleaner[gleanerio contaianer config] container-config-nabu[gleanerio container config for nabu] end env['environment variables'] end subgraph docker[docker managed by portainer] subgraph Containers subgraph dagster-compose-stack dagit dagster postgres end subgraph ingest-compose-stack ingest tasks end subgraph eco-compose-stack ecrr end end config subgraph Volumes dagster-postgres end end postgres--uses-->dagster-postgres dagster--uses-->workflow dagit--uses-->workflow workflow-->config maincompose--deploys-->dagit[dagster webserver] maincompose--deploys-->dagster[dagster main] maincompose--deploys-->ingest[gleanerio ingest code] maincompose--deploys-->tasks[gleanerio task code] project_overrides--deploys-->ecrr[earthcube code] ingest--reads-->gleanconfig ingest--reads-->tenant tasks--reads-->gleanconfig tasks--reads-->gleanconfig dagster--uses-->postgres

basic deployment

  1. information for environment variables is created
  2. The configuration files are created and loaded to s3, and docker/config
  3. a docker stack for dagster/scheduler created, and the environment variables are added.
  4. portainer deploys stack
  5. a docker stack for an ingest service is created, and the environment variables are added.
  6. portainer deploys stack
  7. initial configuration jobs for ingest and tasks are executed, they read the gleaner and tenant configurations
  8. when complete, they request loading runs for the sources from gleaner
  9. when a loading run is complete, a sensor triggers, and a release is loaded to a tenant

Ingest Workflow

--- title: Ingest Workflow Sequence --- sequenceDiagram participant S3 participant Ingest participant Portainer participant Graph S3->>Ingest: read sources from scheduler/configs/gleanerconfig.yaml S3->>Ingest: read tenant from scheduler/configs/tenant.yaml Ingest-->Ingest: create gleanerio container Ingest->>Portainer: run gleanerio Portainer-->Portainer: docker configs mounted in gleanerio container Portainer-->Portainer: summon for sources Portainer->>S3: jsonld to s3 Portainer->>Ingest: logs returned Ingest->>S3: logs from run to S3 Ingest->>Ingest: create load reports using EC Utils Ingest->>S3: load reports to s3 Ingest->>Portainer: run nabu to Portainer-->Portainer: convert jsonld to release and release summary Portainer->>S3: release and release summary to s3 Ingest->>Ingest: create graph report using EC Utils Ingest->>S3: graph report to s3 Ingest->>Graph: Create a namespaces for tenant Ingest->>Graph: load release and release summary to namespaces
--- title: Ingest Simplified Flowchart --- flowchart LR subgraph config s3_config_sensors end subgraph jobs summon_and_release tenant_release end subgraph assets sources tenants end s3_config_sensors--monitors --> configs s3_config_sensors--writes -->sources s3_config_sensors--writes -->tenants summon_and_release--uses-->sources --runs --> gleanerio tenant_release--uses-->tenants --runs --> tenant_release gleanerio--stores JSONLD -->summon gleanerio--stores log -->logs summon_and_release-- reads --> summon summon_and_release-- converts to graph -->graph_path tenant_release -- monitors --> graph_path tenant_release -- loads releases to --> tenant_namespace tenant_release -- loads releases to --> tenant_summary_namespace subgraph portainer gleanerio tenant_ui end subgraph services triplestore tenant_namespace tenant_summary_namespace end subgraph minio_s3 subgraph bucket_paths subgraph scheduler configs["`scheduler/configs`"] logs end summon graph_path['graph'] end end

Task workflows

--- title: Task Workflow Sequence --- sequenceDiagram participant S3 participant Ingest participant Portainer participant Graph Ingest->>Ingest: all_graph_stats assets: graph statistics using EC Utils Ingest->>S3: load all_graph_stats to s3 Ingest->>Ingest: source_stats assets: loadstatsHistory using EC Utils Ingest->>Graph: sparql query to get graph stats Graph->>Ingest: results for source_stats Ingest->>S3: source_stats to s3

Steps to build and deploy

The deployment can be developed locally. You can run jobs and materialize assets from the command line

You can set up a services stack in docker to locally test, or use existing services.

The production 'containers' dagster, gleaner, and nabu are built with a GitHub action. You can also use a makefile.

This describes the local and container deployment We use portainer to manage our docker deployments.

Server Deployment.

Production example for Earthcube

DEVELOPER Pycharm -- Run local with remote services

You can test components in pycharm. Run configurations for pycgharm are in runConfigurations (TODO: Instructions) use the ENVFIle plugin. pycharm runconfig

  1. move to the implnets/deployment directory
  2. copy the envFile.env to .env see use the ENVFIle plugin.
    1. edit the entries to point at a portainer/traefik with running services
  3. edit configuration files in implnets/configs/PROJECT: gleanerconfig.yaml, tenant.yaml
  4. upload configuration implnets/configs/PROJECT to s3 scheduler/configs: gleanerconfig.yaml, tenant.yaml
  5. run a Pycharm runconfig
    1. eg dagster_ingest_debug
  6. go to http://localhost:3000/
  7. you can test the schedules

full stack test Run local with remote services

  1. move to the implnets/deployment directory
  2. copy the envFile.env to .env seeuse the ENVFIle plugin. see use the ENVFIle plugin.
  3. edit the entries.
  4. edit configuration files in scheduler/configs/PROJECT to s3: gleanerconfig.yaml, tenant.yaml
  5. upload configuration scheduler/configs/PROJECT to scheduler/configs s3: gleanerconfig.yaml, tenant.yaml
  6. for local, ./dagster_localrun.sh
  7. go to http://localhost:3000/

To deploy in portainer, use the deployment/compose_project.yaml docker stack.

docker compose Configuration:

there are configuration files that are needed. They are installed in two places: * as docker configs * as scheduler configs in S3

(NOTE: I think the configs are still needed in the containers)

file local note
workspace configs/local/worksapce.yaml dockerconfig: workspace docker compose: used by dagster
gleanerconfig.yaml configs/PROJECT/gleanerconfig.yaml s3:{bucket}/scheduler/configs/gleanerconfigs.yaml ingest workflow needs to be in minio/s3
tenant.yaml configs/PROJECT/tenant.yaml s3:{bucket}/scheduler/configs/tenant.yaml ingest workflow needs to be in minio/s3
dagster.yaml dagster/implnets/deployment/dagster.yaml dockerconfig: dagster docker compose: used by dagster
gleanerconfig.yaml configs/PROJECT/gleanerconfig.yaml read from s3url by gleaner
nabuconfig.yaml configs/PROJECT/nabuconfig.yaml read from s3 url by nabu

(NOTE: This is also a gleaner config (below in runtime configuration) that needs to be updated to mactch to make things work)

Docker Configs for gleanerio containers are still needed:

file local stack note
gleanerconfig.yaml configs/PROJECT/gleanerconfigs.yaml env ()
nabuconfig.yaml configs/PROJECT/nabuconfigs.yaml env ()
  1. when the containers are running in a stack, on portainer, there will need to be updated by pulling from dockerhub. The ENV variables may need to be updated for the CONTAINER*_TAG

Runtime configuration

upload to an s3 bucket

file local note
gleanerconfig.yaml s3:{bucket}/scheduler/configs/gleanerconfigs.yaml ingest workflow needs to be in minio/s3
nabuconfig.yaml s3:{bucket}/scheduler/configs/nabuconfig.yaml ingest workflow needs to be in minio/s3
tenant.yaml s3:{bucket}/scheduler/configs/tenant.yaml ingest workflow needs to be in minio/s3

updating config

You can update a config, and a sensor should pick up the changes. 1. Upload changed file to s3 2) note, if this is a new source, you need to add it to the docker config (gleaner-PROJECT). 1. go to overview, overview 1. go to s3_config_source_sensor for gleanerconfig.yaml changes, and s3_config_tenant_sensor for tenant.yaml changes sensor. 1. at some point, a run should occur. run. 1. then go to the sources_sensor, or tenant sensor if job does not run, you can do a backfill.

new sources:

  1. so to job tab, and run summon_and_release with the 'partitions' aka 'sources' that are recent.
  2. click materialize_all, and in the backfill dialog be sure only the added partition is selected. backfill.
  3. go to runs, and see that a job with a partition with that name is queued/running
  4. run tenant_release_job with same partition name to load data to tenants

new tenants:

There are two jobs that need to run to move data to a tenant. (third will be needed for UI) 1. so to job tab, and run tenant_namespaces_job with the 'partitions' aka 'tenant' that are recent.' 1. click materialize_all, and be sure only the added partition is selected 1. go to runs, and see that a job with a partition with that name is queded,/running 1. so to job tab, and run tenant_release_job with the 'partitions' aka 'sources' for that tenant 1. click materialize_all, The data will be pushed to all tenant namespaces

test schedules

schedules tab schedules example schedules select schedules test

Environment files

  1. cp deployment/envFile.env .env
  2. edit
  3. export $(cat .env | xargs) export $(cat .env | xargs)
    # DAGSTER_ FOR LOCAL DEVELOPMENT
    DAGSTER_HOME=dagster/dagster_home
    DAGSTER_LOCAL_ARTIFACT_STORAGE_DIR=/Users/valentin/development/dev_earthcube/scheduler/dagster/dagster_home/
    
    # dagster network and volume
    #GLEANERIO_DAGSTER_STORAGE=dagster_storage
    #GLEANERIO_DAGSTER_NETWORK=dagster_host
    
    ## PROJECT -- default 'eco' this is a 'TRAEFIK router name' use to run multiple copies of scheduler on a server
    #     originally used to generate code for a specific project
    #PROJECT=test
    
    #PROJECT=eco
    #PROJECT=iow
    #PROJECT=oih
    
    # ###
    # workspace for dagster
    ####
    GLEANERIO_WORKSPACE_CONFIG_PATH=/usr/src/app/workspace.yaml
    GLEANERIO_DOCKER_WORKSPACE_CONFIG=workspace-eco
    
    GLEANERIO_DOCKER_DAGSTER_CONFIG=dagster
    
    DEBUG_CONTAINER=false
    
    #### HOST
    #  host base name for treafik. fixed to localhost:3000 when using  compose_local.
    HOST=localhost
    # Applies only to compose_project.yaml runs
    
    #  modify SCHED_HOSTNAME is you want to run more than one instance
    #    aka two different project havests for now.
    SCHED_HOSTNAME=sched
    
    GLEANERIO_DOCKER_CONTAINER_WAIT_TIMEOUT=300
    # debugging set to 10 - 30 seconds
    
    # DEFAULT SCHEDULE
    # as defined by https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#basic-schedules
    #  "@hourly", "@daily", "@weekly", and "@monthly"
    #GLEANERIO_DEFAULT_SCHEDULE=@weekly
    #GLEANERIO_DEFAULT_SCHEDULE_TIMEZONE=America/Los_Angeles
    # the above a used as hard coded os.getenv(), so when changed, service needs to be restarted.
    
    
    # tags for docker compose
    CONTAINER_CODE_TAG=latest
    CONTAINER_DAGSTER_TAG=latest
    
    PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
    # port is required: https://portainer.{HOST}:443/api/endpoints/9/docker/
    # 9 is dataloader, 2 is aws-dev
    GLEANERIO_DOCKER_URL=https://portainer.{HOST}:443/api/endpoints/9/docker/
    GLEANERIO_PORTAINER_APIKEY=
    # if running dagster-dev, then this needs to be set ,
    #       defaults to "/scheduler/gleanerconfig.yaml" which is path to config mounted in containers
    # when debugging generated code "../../../configs/eco/gleanerconfig.yaml"
    # when debugging code in workflows "../../configs/eco/gleanerconfig.yaml"
    GLEANERIO_DAGSTER_CONFIG_PATH=../../../configs/eco/gleanerconfig.yaml
    
    # Network
    GLEANERIO_DOCKER_HEADLESS_NETWORK=headless_gleanerio
    
    ### GLEANER/NABU Dockers
    GLEANERIO_GLEANER_IMAGE=nsfearthcube/gleaner:dev_ec
    GLEANERIO_NABU_IMAGE=nsfearthcube/nabu:dev_eco
    
    
    ###
    #path in s3 for docker log files
    GLEANERIO_LOG_PREFIX=scheduler/logs/
    
    GLEANERIO_MINIO_ADDRESS=
    GLEANERIO_MINIO_PORT=80
    GLEANERIO_MINIO_USE_SSL=false
    GLEANERIO_MINIO_BUCKET=
    GLEANERIO_MINIO_ACCESS_KEY=
    GLEANERIO_MINIO_SECRET_KEY=
    #
    # where are the gleaner and tennant configurations
    GLEANERIO_CONFIG_PATH=scheduler/configs/test/
    GLEANERIO_TENANT_FILENAME=tenant.yaml
    GLEANERIO_SOURCES_FILENAME=gleanerconfig.yaml
    GLEANERIO_DOCKER_NABU_CONFIG=nabuconfig.yaml
    ###
    #path in s3 for docker log files
    GLEANERIO_LOG_PREFIX=scheduler/logs/
    
    
    ###
    # graph
    ####
    # just the base address, no namespace https://graph.geocodes-aws-dev.earthcube.org/blazegraph
    GLEANERIO_GRAPH_URL=https://graph.geocodes-aws.earthcube.org/blazegraph
    GLEANERIO_GRAPH_NAMESPACE=earthcube
    
    
    GLEANERIO_CSV_CONFIG_URL=https://docs.google.com/spreadsheets/d/e/2PACX-1vTt_45dYd5LMFK9Qm_lCg6P7YxG-ae0GZEtrHMZmNbI-y5tVDd8ZLqnEeIAa-SVTSztejfZeN6xmRZF/pub?gid=1340502269&single=true&output=csv
    
    

Appendix

Portainer API setup

You will need to setup Portainer to allow for an API call. To do this look at the documentation for Accessing the Portainer API

Notes

  • Don't forget to set the DAGSTER_HOME dir like in
 export DAGSTER_HOME=/home/fils/src/Projects/gleaner.io/scheduler/python/dagster

Deply as a single stack

We deploy as two stacks for flexibility, but you can do the multiple file, aka override, to deploy the compose-project.yaml and compose-project-ingest.yaml as a single stack.

docker compose -p dagster --env-file $envfile -f compose_project.yaml compose-project-ingest.yam; up -d

Handle Multiple Organizations

There can be multiple ingest containers. These can be used for testing developement deployments, and multiple organizations.

The top level compose-project.yaml handles the dagster. 1) Deploy a compose-project-ingest.yaml stack with a different PROJECT env variables, and minio and graph environment variables to push to each communities repository. 2) configure the workflows configuration in dagstger to include those containers as workflows.

If you need to add workflows, fork the code, and add the branch to the containerize git workflow. * Each organization can be in a container with its own code workflow. * in the workflows directory: dagster project projectname * If we can standardize the loading and transforming workflows as much as possible, then the graph loading workflows should be standardized. We could just define an additional container in a compose file, and add that to the workflows

load_from:
#      - python_file:
#          relative_path: "project/eco/repositories/repository.py"
#          location_name: project
#          working_directory: "./project/eco/"
#      - python_file:
#          relative_path: "workflows/ecrr/repositories/repository.py"
#          working_directory: "./workflows/ecrr/"
      # module starting out with the definitions api
     # - python_module: "workflows.tasks.tasks"

      - grpc_server:
            host: dagster-code-eco-tasks
            port: 4000
            location_name: "eco-tasks"
      - grpc_server:
            host: dagster-code-eco-ingest
            port: 4000
            location_name: "eco-ingest"
      - grpc_server:
            host: dagster-code-oih--tasks
            port: 4000
            location_name: "oih-tasks"
      - grpc_server:
            host: dagster-code-oih-ingest
            port: 4000
            location_name: "oih-ingest"
      - grpc_server:
            host: dagster-code-eco-ecrr
            port: 4000
            location_name: "eco-ecrr"
  • to add a container, you need to edit the workflows.yaml in an organizations configuration

Cron Notes

A useful on-line tool: https://crontab.cronhub.io/

0 3 * * *   is at 3 AM each day

0 3,5 * * * at 3 and 5 am each day

0 3 * * 0  at 3 am on Sunday

0 3 5 * *  At 03:00 AM, on day 5 of the month

0 3 5,19 * * At 03:00 AM, on day 5 and 19 of the month

0 3 1/4 * * At 03:00 AM, every 4 days

Indexing Approaches

The following approaches

  • Divide up the sources by sitemap and sitegraph
  • Also divide by production and queue sources

The above will result in at most 4 initial sets.

We can then use the docker approach

./gleanerDocker.sh -cfg /gleaner/wd/rundir/oih_queue.yaml  --source cioosatlantic

to run indexes on specific sources in these configuration files.